1
2
3
4 """
5 Executes CUs in parallel-mode, preparing ingest files.
6
7 @author: E. Sutorius
8 @org: WFAU, IfA, University of Edinburgh
9
10 @newfield contributors: Contributors, Contributors (Alphabetical Order)
11 @contributors: R.S. Collins
12 """
13
14 from collections import defaultdict
15 import inspect
16 import math
17 import multiprocessing
18 import mx.DateTime as mxTime
19 import os
20 import re
21 import shutil
22 import time
23
24 from invocations.cu1.cu1 import Cu1
25 from invocations.cu1.cu1Transfer import CASUQueries
26 from invocations.cu2.cu2 import Cu2
27 from invocations.cu3.cu3 import Cu3
28 from invocations.cu4.cu4 import Cu4
29
30 from wsatools.CLI import CLI
31 import wsatools.CSV as CSV
32 import wsatools.DbConnect.DbConstants as dbc
33 from wsatools.DbConnect.DbSession import DbSession
34 from wsatools.File import File
35 import wsatools.FitsUtils as fits
36 from wsatools.DbConnect.DbSession import Ingester
37 from wsatools.DbConnect.IngCuSession import IngCuSession
38 from wsatools.DbConnect.IngIngester import IngestLogger
39 import wsatools.DbConnect.Schema as schema
40 from wsatools.SystemConstants import SystemConstants
41 import wsatools.Utilities as utils
44
45
46 numprocessors = 2
47
48
50 """
51 Run the CU threads.
52
53 @param procDates: The dates to run in parallel.
54 @type procDates: list
55 @param resultQueue: The results queue.
56 @type resultQueue: Queue object
57
58 """
59 outList = []
60 for _cu in _cuRuns:
61 _cu.run()
62 outList.extend(_cu.outPrefixList)
63 resultQueue.put(outList)
64
65
66
67 - def run(self, cuList):
68 """
69 Thread the CUs.
70
71 @param cuList: The CUs to run in parallel.
72 @type cuList: list
73
74 @return: Dictionary containing results of the CU runs.
75 @rtype: dict
76 """
77 if self.numprocessors > len(cuList):
78 self.numprocessors = len(cuList)
79 IngestLogger.addMessage("Running %d CUs on %d processors." % (
80 len(cuList), self.numprocessors))
81 if self.numprocessors == 0:
82 raise SystemExit("<Error> No CUs available!")
83
84 outQueue = multiprocessing.Queue()
85 chunksize = int(math.ceil(len(cuList) / float(self.numprocessors)))
86 procs = []
87 for i in xrange(self.numprocessors):
88 if i > 0:
89 time.sleep(20)
90 p = multiprocessing.Process(
91 target=self.runThreads,
92 args=(cuList[chunksize * i:chunksize * (i + 1)],
93 outQueue))
94 procs.append(p)
95 p.start()
96
97
98
99 resultList = []
100 for i in xrange(self.numprocessors):
101 resultList.extend(outQueue.get())
102
103
104 for p in procs:
105 p.join()
106
107
108 return resultList
109
113 """ Executes CUs in parallel-mode, preparing ingest files.
114 """
115 diskList = None
116 casuDisk = None
117
118
119
120 - def __init__(self,
121 cuNums=CLI.getOptDef("cunums"),
122 curator=CLI.getOptDef("curator"),
123 database=DbSession.database,
124 beginDate=CLI.getOptDef("begin"),
125 endDate=CLI.getOptDef("end"),
126 versionStr=CLI.getOptDef("version"),
127 outPath=CLI.getOptDef("outpath"),
128 noMfID=CLI.getOptDef("nomfid"),
129 ReDo=CLI.getOptDef("redo"),
130 programmeList=CLI.getOptDef("programmes"),
131 progOrder=CLI.getOptDef("progorder"),
132 isTrialRun=DbSession.isTrialRun,
133 xferlog=CLI.getOptDef("xferlog"),
134 timeCheck=CLI.getOptDef("timecheck"),
135 numThreads=CLI.getOptDef("threads"),
136 writeMfID=CLI.getOptDef("writemfid"),
137 subDir=CLI.getOptDef("subdir"),
138 janet=CLI.getOptDef("janet"),
139 forceXfer=CLI.getOptDef("force"),
140 excludeData=CLI.getOptDef("exclude"),
141 deprecation=CLI.getOptDef("deprecation"),
142 reproMode=CLI.getOptDef("repromode"),
143 forceMosaic=CLI.getOptDef("mosaic"),
144 detectionSubset=CLI.getOptDef("subset"),
145 keepWorkDir=CLI.getOptDef("keepwork"),
146 ffluOnly=CLI.getOptDef("ffluonly"),
147 getHeader=CLI.getOptDef("getheader"),
148 gesOptions=CLI.getOptDef("gesoptions"),
149 comment=CLI.getArgDef("comment")):
150 """
151 @param beginDate: First date to process, eg. 20050101.
152 @type beginDate: str
153 @param comment: Descriptive comment as to why curation task is
154 being performed.
155 @type comment: str
156 @param cuNums: Curation task numbers.
157 @type cuNums: list[int]
158 @param curator: Name of curator.
159 @type curator: str
160 @param database: Name of the database to connect to.
161 @type database: str
162 @param detectionSubset: Process subset of [Astrometry table,
163 Photometry table, Raw table].
164 @type detectionSubset: list(str)
165 @param endDate: Last date to process, eg. 20050131.
166 @type endDate: str
167 @param excludeData: Exclude given files from processing (CU4).
168 @type excludeData: list(str)
169 @param ffluOnly: Don't transfer but update FlatfileLookup table.
170 @type ffluOnly: bool
171 @param forceXfer: Force the data transfer.
172 @type forceXfer: bool
173 @param gesOptions: Options for GES transfers.
174 @type gesOptions: str
175 @param getHeader: Only transfer FITS headers.
176 @type getHeader: bool
177 @param isTrialRun: If True, do not perform database modifications.
178 @type isTrialRun: bool
179 @param janet: Use JANET instead of UKLight.
180 @type janet: bool
181 @param forceMosaic: Create jpg for mosaic.
182 @type forceMosaic: bool
183 @param keepWorkDir: Don't remove working directory.
184 @type keepWorkDir: bool
185 @param noMfID: If True, don't write new MfID into FITS file.
186 @type noMfID: bool
187 @param numThreads: Number of scp transfer or processing threads.
188 @type numThreads: int
189 @param outPath: Directory where data is written to.
190 @type outPath: str
191 @param programmeList: Only process data for given programmes (accepts
192 keywords 'all', 'ns' (non-survey),
193 'ukidss' (all 5 main surveys)).
194 @type programmeList: list(str)
195 @param progOrder: The order of processing of the programmes, all
196 programmes not explicitely named can be put
197 anywhere in the list as 'others'.
198 @type progOrder: list(str)
199 @param ReDo: If True, overwrite existing MfID (CU1/3) or jpg
200 (CU2).
201 @type ReDo: bool
202 @param reproMode: Mode for reprocessed data: otm (one-to-many dirs)
203 or datedir suffix.
204 @type reproMode: str
205 @param subDir: The subdirectory containing FITS files.
206 @type subDir: str
207 @param timeCheck: Determines if existing files should be checked for
208 their timestamp against the same file at CASU;
209 @type timeCheck: bool
210 @param versionStr: Version number of the data.
211 @type versionStr: str
212 @param writeMfID: If True, only write MfIDs into FITS files.
213 @type writeMfID: bool
214 @param xferlog: Logfile containing files to be ingested.
215 @type xferlog: str
216
217 """
218
219 super(DataBuilder, self).__init__(cuNum=0,
220 curator=curator,
221 comment=comment,
222 reqWorkDir=True,
223 keepWorkDir=keepWorkDir,
224 database=database,
225 autoCommit=False,
226 isTrialRun=isTrialRun)
227
228 beginDate, endDate = \
229 self.sysc.obsCal.getDatesFromInput(beginDate, endDate)
230
231 typeTranslation = {"curator":str,
232 "database":str,
233 "cuNums":list,
234 "beginDate":str,
235 "endDate":str,
236 "versionStr":str,
237 "outPath":str,
238 "noMfID":bool,
239 "ReDo":bool,
240 "programmeList":list,
241 "progOrder":list,
242 "isTrialRun":bool,
243 "xferlog":str,
244 "timeCheck":bool,
245 "numThreads":type(self.sysc.scpThreads),
246 "writeMfID":bool,
247 "subDir":type(self.sysc.fitsDir),
248 "janet":bool,
249 "forceXfer":bool,
250 "excludeData":list,
251 "deprecation":str,
252 "reproMode":str,
253 "forceMosaic":bool,
254 "detectionSubset":list,
255 "keepWorkDir":bool,
256 "ffluOnly":bool,
257 "getHeader":bool,
258 "gesOptions":str,
259 "comment":str}
260
261 super(DataBuilder, self).attributesFromArguments(
262 inspect.getargspec(DataBuilder.__init__)[0], locals(), prefix='_',
263 types=typeTranslation)
264
265 self.curator = self._curator
266 self.comment = self._comment
267 self._UKLight = not self._janet
268
269 if not self._versionStr:
270 self._versionStr = str(max(
271 self.sysc.obsCal.maxVersOfDate(self._beginDate),
272 self.sysc.obsCal.maxVersOfDate(self._endDate)))
273
274 if not self._outPath:
275 self._outPath = self.sysc.dbSharePath()
276
277 implementedCUs = [1, 2, 3, 4]
278 if not set(self._cuNums).intersection(implementedCUs):
279 raise SystemExit("No CU possible for %r.\nImplemented CUs are %r"
280 % (self._cuNums, implementedCUs))
281
282 if set(self._cuNums) - set(implementedCUs):
283 IngestLogger.addMessage(
284 "<Warning> The following CUs are not implemented: " +
285 ', '.join(map(str, set(self._cuNums) - set(implementedCUs))))
286
287 if set(self._cuNums).intersection([1, 2, 3, 4]) and not self._xferlog \
288 and (int(self._beginDate) == int(CLI.getOptDef("begin")) or
289 int(self._endDate) == int(CLI.getOptDef("end"))) \
290 and 'otm' not in self._reproMode:
291 raise SystemExit("The CU needs either a xferlog or a date range "
292 "specified!")
293
294 if 'otm' in self._reproMode and not self._xferlog and not self.casuDisk:
295 raise SystemExit("Either a transfer log or a directory at CASU "
296 "need to be specified for reprocessed data "
297 "transfers.")
298
299 if 1 in self._cuNums:
300 if self._numThreads < 10:
301 self._numThreads += 10
302 if self._ffluOnly or self._xferlog:
303 newThreads = 11
304 elif self._numThreads % 10 * self._numThreads // 10 > 4:
305 newThreads = (22 if self._endDate != self._beginDate else 12)
306 else:
307 newThreads = self._numThreads
308 if self._numThreads != newThreads:
309 IngestLogger.addMessage(
310 "<Warning> Too many threads chosen: %d => Resetting to %d" \
311 % (self._numThreads, newThreads))
312 self._numThreads = newThreads
313
314 if self._xferlog:
315 self._xferlog = os.path.abspath(self._xferlog)
316
317
318
320 """ Run each CU requested.
321 """
322 self._cuRuns = []
323 self._cuedFiles = defaultdict(bool)
324 cuEventIDs = defaultdict(list)
325 if 1 in self._cuNums:
326 if self._writeMfID:
327
328 IngestLogger.addMessage("Starting updating multiframeIDs.")
329 fitsDirs = self.createDailyLists(self._xferlog)
330 self.updateMfIDs(fitsDirs.dailyFileListDict)
331 IngestLogger.addMessage("Finished updating multiframeIDs.")
332
333 cuEventIDs[1].append(self._cuEventID)
334 cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[1]))
335 IngestLogger.append(cu0Log.pathName)
336 else:
337 fileDict = defaultdict(list)
338 if self._numThreads == 0:
339 self._numThreads = SystemConstants.scpThreads
340
341 if self._xferlog and 'otm' not in self._reproMode:
342 fileDict, notAvailableFiles = self.parseXferList()
343 if notAvailableFiles:
344 IngestLogger.addMessage(
345 "<Warning> The following files are not available at "
346 "CASU:")
347 for fileName in notAvailableFiles:
348 IngestLogger.addMessage(fileName)
349
350 mjdList = [int(mxTime.strptime(str(date), "%Y%m%d").mjd)
351 for date in fileDict if fileDict[date]]
352 outMessageSuf = ''.join([" from ", self._xferlog])
353 else:
354 startMJD = int(mxTime.strptime(self._beginDate,
355 "%Y%m%d").mjd)
356 endMJD = int(mxTime.strptime(self._endDate,
357 "%Y%m%d").mjd)
358
359
360 if self._reproMode.startswith(('p:', 's:')):
361 CASUQueries.sysc = self.sysc
362 presufCasu = (self._reproMode
363 if self._reproMode.startswith(('p', 's'))
364 else '')
365 casuDateDict = CASUQueries.getCasuDateDirs(
366 dirList=[self.casuDisk] if self.casuDisk else [],
367 UKLight=self._UKLight,
368 mode="STR", presuf=presufCasu)
369 reproFix = self._reproMode.partition("::")[2]
370 mjdList = []
371 for dateDir in casuDateDict:
372 if reproFix in dateDir:
373 date = int(mxTime.strptime(dateDir.replace(
374 reproFix, ''), "%Y%m%d").mjd)
375 if date in range(startMJD, endMJD + 1):
376 mjdList.append(date)
377 elif self.sysc.isGES():
378 mjdList = self.getGesMJDs(startMJD, endMJD)
379 elif 'otm' not in self._reproMode and not self.sysc.isGES():
380
381 CASUQueries.sysc = self.sysc
382 casuDateDict = CASUQueries.getCasuDateDirs(
383 dirList=[self.casuDisk] if self.casuDisk else [],
384 UKLight=self._UKLight, mode="MJD")
385 mjdList = [x for x in range(startMJD, endMJD + 1)
386 if x in casuDateDict]
387 else:
388 mjdList = range(startMJD, endMJD + 1)
389 outMessageSuf = ''
390
391 if not mjdList:
392 IngestLogger.addMessage("No data to be transferred!")
393
394
395 if 'otm' in self._reproMode:
396 IngestLogger.addMessage(''.join([
397 "Preparing CU run for Cu1 for re-processed data in ",
398 self.casuDisk]))
399 self._cuRuns.append(
400 Cu1(curator=self.curator,
401 database=self.database,
402 versionStr=self._versionStr,
403 casuDisk=self.casuDisk,
404 janet=self._janet,
405 deprecation=self._deprecation,
406 xferList=self._xferlog,
407 isTrialRun=self._isTrialRun,
408 forceXfer=self._forceXfer,
409 reproMode=self._reproMode,
410 comment=self.comment))
411 IngestLogger.addMessage("CUEventID: %s" %
412 self._cuRuns[-1]._cuEventID)
413 else:
414 dateList = []
415 for mjd in sorted(mjdList):
416 date = mxTime.DateTimeFromMJD(mjd)
417 dateStr = "%04d%02d%02d" % (date.year, date.month,
418 date.day)
419 dateList.append(dateStr)
420
421
422 dateSubLists = self.splitDateList(dateList)
423
424 for dateGroup in dateSubLists:
425 xferList = []
426 for date in dateGroup:
427 xferList.extend(fileDict[date])
428 IngestLogger.addMessage(
429 "Preparing CU run for Cu1 for %r %s" % (
430 dateGroup, outMessageSuf))
431 self._cuRuns.append(
432 Cu1(curator=self.curator,
433 database=self.database,
434 runDate=dateGroup,
435 versionStr=self._versionStr,
436 casuDisk=self.casuDisk,
437 timeCheck=self._timeCheck,
438 numThreads=self._numThreads,
439 janet=self._janet,
440 deprecation=self._deprecation,
441 xferList=xferList,
442 isTrialRun=self._isTrialRun,
443 forceXfer=self._forceXfer,
444 reproMode=self._reproMode,
445 ffluOnly=self._ffluOnly,
446 onlyHeader=self._getHeader,
447 gesOptions=self._gesOptions,
448 comment=self.comment))
449
450 dateVersStr = ','.join([
451 ("%s_v%s") % (dateStr, self._versionStr)
452 for dateStr in dateGroup])
453 self.updateComment(dateVersStr, cuNum=1)
454 IngestLogger.addMessage(
455 "Transferring from %s" %
456 self.sysc.scpServer(light=not self._janet))
457 IngestLogger.addMessage("CUEventID: %s" %
458 self._cuRuns[-1]._cuEventID)
459
460 for _cu in self._cuRuns:
461 cuEventIDs[1].append(_cu._cuEventID)
462 cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[1]))
463 IngestLogger.append(cu0Log.pathName)
464
465 for _cu in self._cuRuns:
466 _cu.run()
467 if 'mv' in self._deprecation and not self._isTrialRun:
468 self.deprecateJpgs(_cu.deprFileList)
469 if _cu.allXferLogs and not self._isTrialRun \
470 and 'otm' not in self._reproMode \
471 and not self.sysc.isGES():
472 self.updateMfIDs(_cu.allXferLogs)
473 del _cu
474 IngestLogger.addMessage("Finished CU1.")
475
476 if 2 in self._cuNums:
477 cu2OutPath = ('' if self._outPath == self.sysc.dbSharePath()
478 else self._outPath)
479 fitsDirs = self.createDailyLists(self._xferlog)
480
481 if self._xferlog:
482 _dateList = sorted(fitsDirs.dailyFileListDict.keys())
483 self._beginDate = _dateList[0].partition("_v")[0]
484 self._endDate = _dateList[-1].partition("_v")[0]
485 self._versionStr = _dateList[0].partition("_v")[2]
486 startMJD = int(mxTime.strptime(self._beginDate, "%Y%m%d").mjd)
487 endMJD = int(mxTime.strptime(self._endDate, "%Y%m%d").mjd)
488
489 for mjd in xrange(startMJD, endMJD + 1):
490 date = mxTime.DateTimeFromMJD(mjd)
491 dateStr = "%04d%02d%02d" % (date.year, date.month, date.day)
492 dateVersStr = "%s_v%s" % (dateStr, self._versionStr)
493 if dateVersStr in fitsDirs.invFitsDateDict:
494 cu02edFlagList = []
495 try:
496 cu02edFlagList = [os.path.join(
497 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr,
498 "CU02ED_" + self._database.rpartition('.')[2])]
499 except AttributeError:
500 for path in fitsDirs.invFitsDateDict[dateVersStr]:
501 cu02edFlagList.append(os.path.join(
502 path, dateVersStr,
503 "CU02ED_" + self._database.rpartition('.')[2]))
504 for cu02edFlag in cu02edFlagList:
505 if os.path.exists(cu02edFlag):
506 os.remove(cu02edFlag)
507 IngestLogger.addMessage(
508 "Preparing CU run for Cu2 for " + dateStr)
509
510 xferlog = (fitsDirs.dailyFileListDict[dateVersStr][0]
511 if self._xferlog else None)
512
513 self._cuRuns.append(
514 Cu2(curator=self.curator,
515 database=self.database,
516 beginDate=int(dateStr),
517 endDate=int(dateStr),
518 versionStr=self._versionStr,
519 xferLog=xferlog,
520 subDir=self._subDir,
521 jpgPath=cu2OutPath,
522 forceMosaic=self._forceMosaic,
523 reDo=self._ReDo,
524 numThreads=self._numThreads,
525 deprecation=self._deprecation,
526 isTrialRun=self._isTrialRun,
527 comment=self.comment))
528
529 self.updateComment(dateVersStr, cuNum=2)
530
531
532 cuEventIDs[2] = [_cu._cuEventID for _cu in self._cuRuns]
533 cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[2]))
534 IngestLogger.append(cu0Log.pathName)
535
536 for _cu in self._cuRuns:
537 _cu.run()
538 IngestLogger.addMessage("Finished CU2.")
539
540 if 3 in self._cuNums:
541 fitsDirs = self.createDailyLists(self._xferlog)
542 for dateKey in sorted(fitsDirs.dailyFileListDict):
543 for filePath in fitsDirs.dailyFileListDict[dateKey]:
544 listFile = File(filePath)
545 dateVersStr = listFile.root[listFile.root.find("_v") - 8:]
546 IngestLogger.addMessage(
547 "Preparing CU run for Cu3 for " + dateVersStr)
548
549 self._cuRuns.append(
550 Cu3(curator=self.curator,
551 database=self.database,
552 csvSharePath=self._outPath,
553 programmeList=self._programmeList,
554 noMfID=self._noMfID,
555 ReDo=self._ReDo,
556 onlyMfID=self._writeMfID,
557 excludeFiles=self._excludeData,
558 isTrialRun=self._isTrialRun,
559 keepWorkDir=self._keepWorkDir,
560 forceComp=self._forceXfer,
561 isWfauProduct=("products" in self._subDir),
562 xferlog=filePath,
563 comment=self.comment))
564 self.updateComment(dateVersStr, cuNum=3)
565
566 cuEventIDs[3] = [_cu._cuEventID for _cu in self._cuRuns]
567 cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[3]))
568 IngestLogger.append(cu0Log.pathName)
569
570 for _cu in self._cuRuns:
571 if not self._writeMfID:
572 _cu.run()
573 self._cuedFiles[_cu._cuEventID] = _cu.cuedFileExists
574 IngestLogger.addMessage("Finished CU3.")
575
576 self._cuRuns = []
577 self._ingLogList = []
578 if 4 in self._cuNums:
579 fitsDirs = self.createDailyLists(self._xferlog)
580 for dateKey in sorted(fitsDirs.dailyFileListDict):
581 for filePath in fitsDirs.dailyFileListDict[dateKey]:
582 listFile = File(filePath)
583 dateVersStr = listFile.root[listFile.root.find("_v") - 8:]
584 cu03edFlagList = []
585 flatFileIngFlagList = []
586 parentArchive = (self.database.rpartition('.')[2]
587 if "VSA_v1_3" in self.database else
588 "VSA" if sysc.isVSA() else
589 ("OSA" if sysc.isOSA() else "WSA"))
590 try:
591 flatFileIngFlagList = [os.path.join(
592 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr,
593 "FFFIED_" + parentArchive)]
594 cu03edFlagList = [os.path.join(
595 fitsDirs.invFitsDateDict[dateVersStr], dateVersStr,
596 "CU03ED_" + parentArchive)]
597 except AttributeError:
598 for path in fitsDirs.invFitsDateDict[dateVersStr]:
599 flatFileIngFlagList.append(os.path.join(
600 path, dateVersStr,
601 "FFFIED_" + parentArchive))
602 cu03edFlagList.append(os.path.join(
603 path, dateVersStr,
604 "CU03ED_" + parentArchive))
605
606 if not self._forceXfer \
607 and not all(os.path.exists(fffiedFlag)
608 for fffiedFlag in flatFileIngFlagList):
609
610 IngestLogger.addMessage("<IMPORTANT> "
611 "FinaliseFlatFileIngest hasn't been run for %s. "
612 "For WFAU products just re-run this command with the"
613 " -f/--force option. For new CASU products you must "
614 "first run FinaliseFlatFileIngest." % dateVersStr)
615
616 elif self._forceXfer or all(os.path.exists(cu03edFlag)
617 for cu03edFlag in cu03edFlagList):
618 IngestLogger.addMessage(
619 "Preparing CU run for Cu4 for " + dateVersStr)
620
621 self._cuRuns.append(
622 Cu4(curator=self.curator,
623 database=self.database,
624 csvSharePath=self._outPath,
625 programmeList=self._programmeList,
626 progOrder=self._progOrder,
627 detectionSubset=self._detectionSubset,
628 excludeFiles=self._excludeData,
629 isTrialRun=self._isTrialRun,
630 ReDo=self._ReDo,
631 keepWorkDir=self._keepWorkDir,
632 xferlog=filePath,
633 comment=self.comment))
634 self.updateComment(dateVersStr, cuNum=4)
635 else:
636 IngestLogger.addMessage(
637 "No data available for Cu4 for " + dateVersStr)
638
639 if self._numThreads == 0:
640 self._numThreads = 1
641
642 cuEventIDs[4] = [_cu._cuEventID for _cu in self._cuRuns]
643 cu0Log = IngestLogger(self.createLogFileName(cuEventIDs[4]))
644 IngestLogger.append(cu0Log.pathName)
645
646 ThreadCU.numprocessors = self._numThreads
647 threadCUs = ThreadCU()
648 resList = threadCUs.run(self._cuRuns)
649 del threadCUs
650
651 IngestLogger.addMessage("Finished CU4.")
652
653 IngestLogger.addMessage("Overview of all the ingests to run:")
654 for cuNum in cuEventIDs:
655 if cuNum == 4:
656 for outPrefix, cuEvID in resList:
657 self._ingLogList.append((
658 os.path.join(self.dbMntPath, outPrefix),
659 self._cuedFiles[cuEvID]))
660 else:
661 for cuEventID in cuEventIDs[cuNum]:
662 outPrefix = self.createFilePrefix(
663 cuNum, cuEventID, self._hostName,
664 self.database.rpartition('.')[2])
665 self._ingLogList.append((
666 os.path.join(self.dbMntPath, outPrefix),
667 self._cuedFiles[cuEventID]))
668 for entry in sorted(set(self._ingLogList)):
669 forceFlag = (" -F" if entry[1] else '')
670 IngestLogger.addMessage("./IngestCUFiles.py -i -r %s%s %s" % (
671 os.path.join(self.dbMntPath, entry[0] + ".log"), forceFlag,
672 self.database))
673
674
675 IngestLogger.append(cu0Log.pathName)
676
677
678
679 @staticmethod
681 """ Yield successive n-sized chunks from l.
682 """
683 if n == 0:
684 n = 1
685 for i in xrange(0, len(l), n):
686 yield l[i:i+n]
687
688
689
690 @staticmethod
704
705
706
708 """ Get the list of MJDs for the GES transfer.
709 """
710 if self._gesOptions.endswith(('a','s')):
711 mjdList = range(startMJD, startMJD + 1)
712 else:
713 CASUQueries.sysc = self.sysc
714 mjdList = []
715 casuSubDir = ("uves/proc_%s" \
716 if self._gesOptions.startswith('u')
717 else "proccasu_%s")
718 casuDirs = CASUQueries.getCasuStagingDirs(
719 UKLight=self._UKLight,
720 casuSubDirPath=casuSubDir % self._versionStr)
721
722 for cd in casuDirs:
723 for line in CASUQueries.getDirListing(cd, self._UKLight, "-p"):
724 if line.strip().startswith('d'):
725 subDir = line.strip().rpartition(' ')[2].replace('/', '')
726 if subDir.isdigit():
727 date = int(mxTime.strptime(subDir, "%Y%m%d").mjd)
728 if date in range(startMJD, endMJD + 1):
729 mjdList.append(date)
730 return mjdList
731
732
733
735 """ Split the dateList into multiple similar sized threads
736 """
737 CASUQueries.sysc = self.sysc
738 try:
739 if len(dateList) < self._numThreads // 10:
740 newThreads = len(dateList) * 10 + self._numThreads % 10
741 IngestLogger.addMessage(
742 "<Warning> Short date list. Too many threads chosen:"
743 " %d => Resetting to %d" % (
744 self._numThreads, newThreads))
745 self._numThreads = newThreads
746 if (self._numThreads // 10) == 1:
747 newDateList = sorted(dateList)
748 else:
749 casuDict = self.getCasuSizeDict(dateList, self._UKLight)
750 newDateList = casuDict.keys()
751
752 except Exception as error:
753
754 IngestLogger.addExceptionDetails(error)
755 newDateList = dateList
756
757 return self.chunks(newDateList, self._numThreads // 10)
758
759
760
762 """
763 Create the daily filelists.
764
765 @param xferLog: Logfile containing files to be ingested.
766 @type xferLog: str
767
768 @return: A FitsList object holding daily file information.
769 @rtype: FitsList
770
771 """
772 fitsDirs = fits.FitsList(self.sysc, prefix="cu0_")
773 fitsDirs.createFitsDateDict(ingestDirectory=self._subDir,
774 trafoLog=xferLog)
775 IngestLogger.addMessage("Creating daily filelists.")
776 fitsDirs.createDailyFileLists(self._workPath, int(self._beginDate),
777 int(self._endDate), self._versionStr,
778 xferLog)
779 return fitsDirs
780
781
782
784 """
785 Deprecate jpgs of deprecated FITS files.
786
787 @param deprFitsList: List of deprecated FITS files.
788 @type deprFitsList: list
789
790 @return: A FitsList object holding daily file information.
791 @rtype: FitsList
792
793 """
794
795 deprTime = utils.makeTimeStamp().split()[0].replace('-', '')
796 spacePerDisk = utils.getDiskSpace(self.sysc.deprecatedDataStorage)
797 deprJpgDir = os.path.join(utils.getNextDisk(self.sysc, spacePerDisk),
798 '%s_%s' % (self.sysc.deprecatedComprImDir, deprTime))
799
800 IngestLogger.addMessage("Deprecating JPGs to %s" % deprJpgDir)
801 counter = 0
802 self._connectToDb()
803 dateList = \
804 set(File(fitsFileName).subdir for fitsFileName in deprFitsList)
805
806 mfIDDict = {}
807 for dateStr in dateList:
808 results = self.archive.query(
809 "fileName, multiframeID",
810 "Multiframe",
811 "fileName LIKE'%" + dateStr + "%'")
812 for fileName, multiframeID in results:
813 if fileName.rpartition(':')[2] in deprFitsList:
814 mfIDDict[fileName] = str(multiframeID)
815
816 if mfIDDict:
817 oldJpgFiles = self.archive.query(
818 selectStr="multiframeID, extNum, compFile",
819 fromStr="MultiframeDetector",
820 whereStr="multiframeID IN (%s)" % ','.join(mfIDDict.values()))
821
822 for dateStr in dateList:
823 deprJpgPath = os.path.join(deprJpgDir, dateStr)
824 utils.ensureDirExist(deprJpgPath)
825 for mfID, extNum, jpgFileName in oldJpgFiles:
826 jpgFile = File(jpgFileName.replace(
827 self.sysc.pixelServerHostName, ''))
828 if dateStr in jpgFile.name \
829 and os.path.exists(jpgFile.name):
830 deprJpgName = os.path.join(deprJpgPath, jpgFile.base)
831 if jpgFile.name != deprJpgName:
832 shutil.copy2(jpgFile.name, deprJpgName)
833 os.remove(jpgFile.name)
834 else:
835 IngestLogger.addMessage(
836 "Deprecated file %s exists already!" %
837 deprJpgName)
838 compEntry = \
839 "'%s%s'" % (self.sysc.pixelServerHostName,
840 deprJpgName)
841 counter += self.archive.update("MultiframeDetector",
842 [('compFile', compEntry)],
843 "multiframeID=%s AND extNum=%s" % (mfID, extNum))
844
845 self._disconnectFromDb()
846 IngestLogger.addMessage(
847 "%s JPGs deprecated to %s." % (counter, deprJpgDir))
848
849
850
852 """ Parse the transfer list and check for files.
853 """
854 CASUQueries.sysc = self.sysc
855 casuDateDict = {}
856 casuFileSystem = CASUQueries.getCasuStagingDirs(self._janet)
857 if self.casuDisk and not "reprocessed" in self.casuDisk:
858 if self.casuDisk:
859 fileSystem = [self.casuDisk]
860 else:
861 fileSystem = casuFileSystem
862 casuDateDict = CASUQueries.getCasuDateDirs(fileSystem, self._janet)
863 elif not self.casuDisk:
864 casuDateDict = CASUQueries.getCasuDateDirs(
865 casuFileSystem, self._janet)
866
867 notAvailFiles = []
868 xferDict = defaultdict(list)
869 for line in file(self._xferlog):
870 entry = line.rstrip()
871
872 try:
873 if '/' in entry:
874 dateStr, fileName = entry.split('/')
875 else:
876 pattern = CASUQueries.re_digits
877 dateStr = pattern.search(entry).group()
878 fileName = entry
879
880 if self._beginDate <= dateStr <= self._endDate:
881 if dateStr in casuDateDict:
882 basePath = casuDateDict[dateStr]
883 if not isinstance(basePath, basestring) \
884 and len(basePath) > 1:
885 raise AttributeError
886 else:
887 basePath = casuDateDict[dateStr][0]
888 else:
889 basePath = self.casuDisk
890 xferDict[dateStr].append(
891 os.path.join(basePath, dateStr, fileName))
892 except KeyError:
893 notAvailFiles.append(os.path.join(dateStr, fileName))
894 except AttributeError:
895 if isinstance(basePath, (list, tuple)) \
896 and not isinstance(basePath, basestring):
897 IngestLogger.addMessage(
898 "Date %s found in multiple locations: %r" % (
899 dateStr, basePath))
900 elif not basePath:
901 IngestLogger.addMessage(
902 "Date not found on CASU disks: %r" % casuFileSystem)
903 notAvailFiles.append(os.path.join(dateStr, fileName))
904
905 return xferDict, notAvailFiles
906
907
908
947
948
949
951 """
952 Update the FlatFileLookUp table.
953
954 @param csvFileName: CSV file where data is written to.
955 @type csvFileName: str
956
957 """
958 _ingestSchema = "WSA_CurationLogsSchema.sql"
959 _tableName = 'FlatFileLookUp'
960 fileList = [x[0] for x in self._fileList]
961 dateList = sorted(set(os.path.basename(os.path.dirname(entry))
962 for entry in fileList))
963 for dateVersStr in dateList:
964
965 dataDict = ({} if self._ReDo and self._forceXfer \
966 and self._writeMfID else \
967 self.getMultiframeIDs(dateVersStr))
968
969 mfidName = dbc.multiframeUIDAttributeName()
970 _nextMfID = self._getNextID(mfidName, "FlatFileLookUp",
971 where="%s<1000000000000" % mfidName)
972 _nextWfauMfID = max(self._getNextID(mfidName, "FlatFileLookUp",
973 where="%s>1000000000000" % mfidName),
974 1000000000001)
975
976 nrCasu = 0
977 nrWfau = 0
978 csvList = []
979 directUpdate = False
980 for entry in sorted(fileList):
981 if entry.endswith(self.sysc.mefType):
982 fName = File(entry)
983 partName = os.path.join(fName.subdir, fName.base)
984 if partName in dataDict:
985 self.archive.update("FlatFileLookUp",
986 [('cuEventID', self._cuEventID), ('fileName',
987 "'%s%s'" % (self.sysc.pixelServerHostName, entry))],
988 "multiframeID=%s" % dataDict[partName])
989 self.archive.commitTransaction()
990 nrWfau += 1
991 directUpdate = True
992 elif entry not in dataDict:
993 if os.path.basename(entry).startswith("e20"):
994 newMfID = _nextWfauMfID + nrWfau
995 nrWfau += 1
996 else:
997 newMfID = _nextMfID + nrCasu
998 nrCasu += 1
999
1000 csvList.append(
1001 (newMfID, self._cuEventID,
1002 os.path.basename(os.path.dirname(entry)),
1003 '%s%s' % (self.sysc.pixelServerHostName, entry)))
1004
1005 if csvList:
1006 CSV.File(csvFileName, 'w').writelines(csvList)
1007 time.sleep(20)
1008 try:
1009 ingester = Ingester(self.archive,
1010 schema.parseTables(_ingestSchema, [_tableName]))
1011 except schema.MismatchError as error:
1012 raise IngCuSession.IngCuError(error)
1013
1014 IngestLogger.addMessage("Ingesting from %s" % csvFileName)
1015 ingester.ingestTable(_tableName, csvFileName,
1016 isCsv=True, deleteFile=False)
1017 ingester.commitTransaction()
1018 newPath = self.sysc.ingestedFilesPath(self.archive.sharePath())
1019 newLocation = os.path.join(
1020 newPath, os.path.basename(csvFileName))
1021 os.rename(ingester.sharePath(csvFileName),
1022 newLocation)
1023 del ingester
1024 time.sleep(20)
1025
1026 IngestLogger.addMessage(
1027 "FlatFileLookUp table updated with %s listed files." % len(csvList))
1028 elif directUpdate:
1029 self.archive.commitTransaction()
1030 IngestLogger.addMessage(
1031 "FlatFileLookUp table updated with %s files." % nrWfau)
1032 else:
1033 IngestLogger.addMessage("No new data to update.")
1034
1035
1036
1038 """
1039 Write multiframe IDs into FlatFileLookUp.
1040
1041 @param dailyFileListDict: Dictionary containing daily file lists.
1042 @type dailyFileListDict: dict
1043
1044 """
1045 IngestLogger.addMessage("Updating multiframeIDs in FlatFileLookUp.")
1046 try:
1047 self._cuNum = 1
1048 self._connectToDb()
1049 self._cuEventID = \
1050 self._getNextCuEventID() if not self._isTrialRun else -9999
1051
1052
1053 self._outPrefix = self.createFilePrefix(
1054 self._cuNum, self._cuEventID, self._hostName,
1055 self.archive.database)
1056
1057 csvFileName = self.archive.sharePath(
1058 "%s_%s.csv" % (self._outPrefix, 'FlatFileLookUp'))
1059
1060 for dateKey in sorted(dailyFileListDict):
1061 for filePath in dailyFileListDict[dateKey]:
1062 self.createFileList(filePath)
1063 if not self._isTrialRun:
1064 self.updateFlatFileLookUp(csvFileName)
1065 else:
1066 IngestLogger.addMessage(
1067 "<TEST> Update for " + filePath)
1068 finally:
1069 self._disconnectFromDb()
1070
1071
1072
1073
1074 if __name__ == "__main__":
1075
1076 CLI.progArgs["comment"] = "Running CU0"
1077 CLI.progOpts += [
1078 CLI.Option('b', "begin",
1079 "first date to process, eg. 20050101",
1080 "DATE", str(IngCuSession.beginDateDef)),
1081 CLI.Option('e', "end",
1082 "last date to process, eg. 20050131",
1083 "DATE", str(IngCuSession.endDateDef)),
1084 CLI.Option('f', "force",
1085 "CU1: transfer regardless any checks; "
1086 "CU3: compress uncompressed FITS files; "
1087 "CU4: process regardless any FlatFileIngest/CU3 checks."),
1088 CLI.Option('j', "janet",
1089 "CU1: use JANET instead of UKLight"),
1090 CLI.Option('k', "disklist",
1091 "CU2: list of RAID disk paths including the archive name, "
1092 "eg. '/disk01/wsa,/disk02/wsa' (depending on given archive).",
1093 "LIST", ''),
1094 CLI.Option('l', "xferlog",
1095 "xferlog of files to be processed",
1096 "LOGFILE", ''),
1097 CLI.Option('m', "writemfid",
1098 "CU1: only update MfIDs in FlatFileLookUp table"),
1099 CLI.Option('n', "nomfid",
1100 "CU1/3: don't write new MfID into FITS file"),
1101 CLI.Option('o', "outpath",
1102 "directory where the output data is written to, "
1103 "defaults to /mnt of given database", "PATH", ''),
1104 CLI.Option('p', "progorder",
1105 "CU4: the order of processing the programmes (accepts "
1106 "keywords 'other','others' for , all programmes not "
1107 "explicitely named)", "LIST", ''),
1108 CLI.Option('r', "redo",
1109 "CU3: enable overwriting of existing MfIDs, use only on a "
1110 "single day basis! CU2: redo existing jpegs. CU4: Redo "
1111 "monthly detection schema."),
1112 CLI.Option('s', "subdir",
1113 "CU2/3: subdirectory containing FITS date directories",
1114 "DIR", SystemConstants.fitsDir),
1115 CLI.Option('w', "casudisk",
1116 "CU1: data disk at CASU (depending on given archive)",
1117 "PATH"),
1118 CLI.Option('x', "cunums",
1119 "csv list of CU numbers to run",
1120 "LIST", ""),
1121 CLI.Option('y', "threads",
1122 "CU1: number of threads: [a]b (a: daywise threads, "
1123 "b: scp threads); CU2: number of processors to use; "
1124 "CU4: number of processors to use (daywise split)",
1125 "INT", '0'),
1126 CLI.Option('v', "version",
1127 "version number of the data",
1128 "STR", ''),
1129 CLI.Option('F', "ffluonly",
1130 "CU1: don't transfer but update FlatfileLookup."),
1131 CLI.Option('G', "gesoptions",
1132 "CU1: GES instrument and type, ie "
1133 "g[iraffe]/u[ves],n[ightly],s[tacked],a[rchived]",
1134 "STR", '', isValOK=lambda x: ',' in x \
1135 and x.startswith(('g','u')) and x.endswith(('a', 'n', 's'))),
1136 CLI.Option('H', "getheader",
1137 "Only transfer FITS headers instead of full file."),
1138 CLI.Option('K', "keepwork",
1139 "Don't remove working dir."),
1140 CLI.Option('M', "mosaic",
1141 "CU2: force making mosaics. Don't use it for large files!"),
1142 CLI.Option('P', "programmes", "CU3/CU4: only process data for given "
1143 "programmes (accepts keywords 'all', 'ns' (non-survey), "
1144 "'ukidss' (all 5 main surveys)); one programme prefixed "
1145 "with 'x-' excludes this programme.", "LIST", 'all'),
1146 CLI.Option('R', "repromode",
1147 "mode for reprocessed data: 'otm' (one-to-many dirs) "
1148 "or 's::suffix' (CASU datedir suffix)."
1149 "or 'v::version' (supply version if not given at CASU)",
1150 "STR", ''),
1151 CLI.Option('S', "subset", "CU4: process subset of [Raw table,"
1152 " Astrometry table, Photometry table]", "LIST",
1153 "Raw,Astrometry,Photometry"),
1154 CLI.Option('T', "timecheck",
1155 "CU1: check files against CASU file timestamps"),
1156 CLI.Option('X', "exclude",
1157 "CU3/4: exclude given files/file patterns from processing",
1158 "LIST", ""),
1159 CLI.Option('Z', "deprecation",
1160 "CU1: modus for deprecating files: 'mv', 'rm', 'ip' "
1161 "(leave in place); CU2: 'modus,deprecated FITS file list'",
1162 "MODE", '')]
1163
1164 cli = CLI(DataBuilder, "$Revision: 10233 $")
1165 IngestLogger.addMessage(cli.getProgDetails())
1166 sysc = SystemConstants(cli.getArg("database").split('.')[-1])
1167 DataBuilder.casuDisk = cli.getOpt("casudisk")
1168 DataBuilder.diskList = (cli.getOpt("disklist") if cli.getOpt("disklist")
1169 else sysc.availableRaidFileSystem())
1170
1171 beginDate, endDate = sysc.obsCal.getDatesFromInput(cli.getOpt("begin"),
1172 cli.getOpt("end"))
1173 if not sysc.isGES() and int(endDate) < int(beginDate):
1174 raise SystemExit("<ERROR> The given end date %s lies before the begin "
1175 "date %s." % (cli.getOpt("end"), cli.getOpt("begin")))
1176
1177 builder = DataBuilder(cli.getOpt("cunums"),
1178 cli.getOpt("curator"),
1179 cli.getArg("database"),
1180 cli.getOpt("begin"),
1181 cli.getOpt("end"),
1182 cli.getOpt("version"),
1183 cli.getOpt("outpath"),
1184 cli.getOpt("nomfid"),
1185 cli.getOpt("redo"),
1186 cli.getOpt("programmes"),
1187 cli.getOpt("progorder"),
1188 cli.getOpt("test"),
1189 cli.getOpt("xferlog"),
1190 cli.getOpt("timecheck"),
1191 cli.getOpt("threads"),
1192 cli.getOpt("writemfid"),
1193 cli.getOpt("subdir"),
1194 cli.getOpt("janet"),
1195 cli.getOpt("force"),
1196 cli.getOpt("exclude"),
1197 cli.getOpt("deprecation"),
1198 cli.getOpt("repromode"),
1199 cli.getOpt("mosaic"),
1200 cli.getOpt("subset"),
1201 cli.getOpt("keepwork"),
1202 cli.getOpt("ffluonly"),
1203 cli.getOpt("getheader"),
1204 cli.getOpt("gesoptions"),
1205 cli.getArg("comment"))
1206 builder.run()
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255