1
2
3
4 """
5 Invoke CU1. Fetch science data from CASU without connecting to the database;
6 to update the database run cu1updatedb.py afterwards.
7
8 @author: E. Sutorius
9 @org: WFAU, IfA, University of Edinburgh
10
11 @newfield contributors: Contributors, Contributors (Alphabetical Order)
12 @contributors: I. Bond, R.S. Collins
13 """
14
15 from collections import defaultdict
16 import inspect
17 import math
18 import multiprocessing
19 from collections import namedtuple
20 import os
21 import time
22
23 from cu1Transfer import CASUTransfer, TransferCASUList, GESTransfer, getScpScriptName
24
25 from wsatools.CLI import CLI
26 import wsatools.DataFactory as df
27 import wsatools.DbConnect.DbConstants as dbc
28 from wsatools.DbConnect.DbSession import DbSession
29 from wsatools.File import File
30 from wsatools.FitsUtils import FitsList
31 from wsatools.DbConnect.IngCuSession import IngCuSession
32 from wsatools.DbConnect.IngIngester import IngestLogger as Logger, \
33 IngestLogFile
34 from wsatools.SystemConstants import SystemConstants
35 import wsatools.Utilities as utils
36
37
39
40
41
42 numprocessors = 2
43 DLStatus = dbc.yes()
44
45
46
48 """
49 Run the transfers.
50
51 @param params: Parameters for CASUTransfer class.
52 @type params: namedtuple
53 @param procDates: The dates to transfer in parallel.
54 @type procDates: list
55 @param resultQueue: The results queue.
56 @type resultQueue: Queue object
57
58 """
59 outDict = defaultdict(list)
60 for date in procDates:
61 transfer = CASUTransfer(
62 scpMainDir=params.scpMainDir,
63 scpThreads=params.scpThreads,
64 checkTimeFlag=params.checkTimeFlag,
65 checkMd5Flag=params.checkMd5Flag,
66 UKLight=params.UKLight,
67 forceTransfer=params.forceTransfer,
68 deprecationMode=params.deprecationMode,
69 database=params.database,
70 outPrefix=params.outPrefix,
71 reproMethod=params.reproMethod,
72 reproVersStr=params.reproVersStr,
73 ffluOnly=params.ffluOnly,
74 onlyHeaderTransfer=params.onlyHeaderTransfer,
75 isTestRun=params.isTestRun)
76 outDict[int(date)] = transfer.transfer(int(date), [])
77 resultQueue.put(outDict)
78
79
80
81 - def run(self, dateList, xferParams):
82 """
83 Thread the downloads.
84
85 @param dateList: The dates to transfer in parallel.
86 @type dateList: list
87 @param xferParams: Parameters for CASUTransfer class.
88 @type xferParams: namedtuple
89
90 @return: Dictionary containing metadata of the daily transfers.
91 @rtype: dict
92 """
93 outQueue = multiprocessing.Queue()
94 chunksize = int(math.ceil(len(dateList) / float(self.numprocessors)))
95 procs = []
96 for i in xrange(self.numprocessors):
97 if i > 0:
98 time.sleep(60)
99 p = multiprocessing.Process(
100 target=self.xferThreads,
101 args=(xferParams, dateList[chunksize * i:chunksize * (i + 1)],
102 outQueue))
103 procs.append(p)
104 p.start()
105
106
107
108 resultDict = defaultdict(list)
109 for i in xrange(self.numprocessors):
110 resultDict.update(outQueue.get())
111
112
113 for p in procs:
114 p.join()
115
116
117 return resultDict
118
119
120
122 """
123 Process the results.
124
125
126 @return: Dictionary containing metadata of the daily transfers.
127 @rtype: dict
128
129 """
130 allTransferLogs = defaultdict(list)
131 fullTransferList = []
132 fullDeprFileList = []
133
134 for date in resultDict:
135 try:
136
137
138
139 transferLog, xferStatus, logText, dateList, \
140 transferList, deprFileList = resultDict[date]
141
142 allTransferLogs[date].append(transferLog)
143 fullTransferList.extend(transferList)
144 fullDeprFileList.extend(deprFileList)
145
146 if xferStatus < 0:
147 status = -1
148 elif xferStatus:
149 status = dbc.yes()
150 else:
151 status = dbc.no()
152
153
154 Cu1.dateSet |= set(dateList)
155
156
157 Logger.addMessage("File transfer finished with status = "
158 + str(Cu1.DLStatus))
159 Logger.addMessage("Data Transfer messages:\n")
160 Logger.addMessage(logText.getvalue())
161 Logger.addMessage("Files downloaded are in: " + transferLog)
162 except Exception as error:
163
164 Logger.addExceptionDetails(error)
165
166
167 self.DLStatus *= status
168
169
170 Cu1.DLStatus *= self.DLStatus
171 return allTransferLogs, fullTransferList, fullDeprFileList
172
173
174
175 -class Cu1(IngCuSession):
176 """Download data from CASU using the module cu1Transfer.
177 """
178 DLStatus = 1
179 dateSet = set()
180 XferParams = namedtuple("XferParams", "scpMainDir, scpThreads, checkTimeFlag, checkMd5Flag, UKLight, forceTransfer, deprecationMode, database, outPrefix, reproMethod, reproVersStr, ffluOnly, onlyHeaderTransfer, isTestRun")
181
182 - def __init__(self,
183 curator=CLI.getOptDef("curator"),
184 database=DbSession.database,
185 runDate=CLI.getOptDef("date"),
186 versionStr=CLI.getOptDef("version"),
187 casuDisk=CLI.getOptDef("casudisk"),
188 timeCheck=CLI.getOptDef("timecheck"),
189 numThreads=CLI.getOptDef("scpthreads"),
190 janet=CLI.getOptDef("janet"),
191 deprecation=CLI.getOptDef("deprecation"),
192 xferList=CLI.getOptDef("xferlog"),
193 isTrialRun=DbSession.isTrialRun,
194 forceXfer=CLI.getOptDef("force"),
195 reproMode=CLI.getOptDef("repromode"),
196 ffluOnly=CLI.getOptDef("ffluonly"),
197 onlyHeader=CLI.getOptDef("getheader"),
198 gesOptions=CLI.getOptDef("gesoptions"),
199 comment=CLI.getArgDef("comment")):
200 """
201 @param casuDisk: Disk at CASU where the data is stored.
202 @type casuDisk: str
203 @param comment: Descriptive comment as to why curation task is
204 being performed.
205 @type comment: str
206 @param curator: Name of curator.
207 @type curator: str
208 @param deprecation: Modus for deprecating files.
209 @type deprecation: str
210 @param ffluOnly: Don't transfer but update FlatfileLookup table.
211 @type ffluOnly: bool
212 @param forceXfer: Force the data transfer.
213 @type forceXfer: bool
214 @param gesOptions: GES instrument and type.
215 @type gesOptions: str
216 @param isTrialRun: If True, do not perform database modifications.
217 @type isTrialRun: bool
218 @param janet: Use JANET instead of UKLight.
219 @type janet: bool
220 @param numThreads: Number of scp transfer threads.
221 @type numThreads: int
222 @param onlyHeader: Only transfer FITS headers.
223 @type onlyHeader: bool
224 @param reproMode: Mode for reprocessed data: otm (one-to-many dirs)
225 or s::suffix (CASU datedir suffix).
226 @type reproMode: str
227 @param runDate: Dates to process, eg. [20050101].
228 @type runDate: list(int)
229 @param timeCheck: Determines if existing files should be checked for
230 their timestamp against the same file at CASU;
231 @type timeCheck: bool
232 @param versionStr: Version number of the data.
233 @type versionStr: str
234 @param xferList: List containing files to be downloaded.
235 @type xferList: list
236
237 """
238 super(Cu1, self).attributesFromArguments(
239 inspect.getargspec(Cu1.__init__)[0], locals())
240
241
242 super(Cu1, self).__init__(cuNum=1,
243 curator=self.curator,
244 comment=self.comment,
245 reqWorkDir=False,
246 database=self.database,
247 autoCommit=False,
248 isTrialRun=self.isTrialRun)
249
250 self.csvSharePath = self.sysc.dbSharePath()
251
252
253 self._initFromDB()
254
255
256
257 self._md5Check = False
258
259
260 self._noData = False
261
262 if not self.deprecation:
263 self.deprecation = "mv"
264
265
266 self.transferList = []
267 self.deprFileList = []
268 self.transferLog = []
269 self.allXferLogs = defaultdict(list)
270
271 if self.gesOptions:
272 self.gesInstrument = ("giraffe" if self.gesOptions.startswith('g')
273 else "uves")
274 self.gesMode = {'s': "stacked", 'n': "nightly", 'a': "archived"}[
275 self.gesOptions[-1]]
276
277
278
307
308
309
311 """ Do CU1. """
312
313 self.lockFile = File(os.path.join(SystemConstants.sysPath,
314 "lockCU1_%s" % self.sysc.loadDatabase))
315
316
317 try:
318 self._createLock()
319 except:
320 print '\n'.join(["\n This code will not run,",
321 " you don't have a key,",
322 " and a lock exists. \n"])
323 raise
324
325 Logger.addMessage("Started CU %d for: %s" % (
326 self.cuNum, ','.join([str(self.runDate), self.reproMode])
327 if self.reproMode else str(self.runDate)))
328
329
330 _logFile = Logger(self.createLogFileName())
331
332
333
334 multiThreaded = False
335 try:
336
337 self._outPrefix = self.createFilePrefix(
338 self.cuNum, self._cuEventID, self._hostName,
339 self.database.rpartition('.')[2])
340
341
342 ingestLogFile = IngestLogFile(os.path.join(
343 self.dbMntPath, self._outPrefix + ".log"))
344
345
346 if 'otm' in self.reproMode:
347
348 TransferCASUList.sysc = self.sysc
349 casuTransferList = TransferCASUList(
350 fitsList=FitsList(self.sysc),
351 casuDir=self.casuDisk,
352 xferList=self.xferList,
353 forceTransfer=self.forceXfer,
354 deprecationMode=self.deprecation,
355 database='.'.join([self.sysc.loadServer,
356 self.sysc.loadDatabase]),
357 UKLight=not self.janet,
358 outPrefix=self._outPrefix,
359 isTestRun=self.isTrialRun)
360
361 self.transferLog, xferStatus, dateList, self.deprFileList = \
362 casuTransferList.transfer()
363 self.dateSet |= set(dateList)
364 elif self.sysc.isGES():
365
366 GESTransfer.sysc = self.sysc
367
368 casuGesTransferList = GESTransfer(
369 instrument=self.gesInstrument,
370 mode=self.gesMode,
371 casuDir=self.casuDisk,
372 xferList=self.xferList,
373 versStr=self.versionStr,
374 forceTransfer=self.forceXfer,
375 database='.'.join([self.sysc.loadServer,
376 self.sysc.loadDatabase]),
377 UKLight=not self.janet,
378 outPrefix=self._outPrefix,
379 isTestRun=self.isTrialRun)
380 self.transferLog, xferStatus, dateList = \
381 casuGesTransferList.transfer(self.runDate[0])
382 self.dateSet |= set(dateList)
383 self.allXferLogs[self.runDate[0]].append(self.transferLog)
384 else:
385 CASUTransfer.sysc = self.sysc
386 xferParams = self.XferParams(
387 self.casuDisk, self.numThreads, self.timeCheck,
388 self._md5Check, not self.janet, self.forceXfer,
389 self.deprecation,
390 '.'.join([self.sysc.loadServer, self.sysc.loadDatabase]),
391 self._outPrefix, self.reproMode,
392 ((','.join([self.versionStr,
393 self.reproMode.partition('::')[2]])
394 if "v::" in self.reproMode else self.versionStr)
395 if self.reproMode else None),
396 self.ffluOnly, self.onlyHeader, self.isTrialRun)
397
398 if self.xferList:
399 casuTransfer = CASUTransfer(
400 scpMainDir=xferParams.scpMainDir,
401 scpThreads=xferParams.scpThreads,
402 checkTimeFlag=xferParams.checkTimeFlag,
403 checkMd5Flag=xferParams.checkMd5Flag,
404 UKLight=xferParams.UKLight,
405 forceTransfer=xferParams.forceTransfer,
406 deprecationMode=xferParams.deprecationMode,
407 database=xferParams.database,
408 outPrefix=xferParams.outPrefix,
409 reproMethod=xferParams.reproMethod,
410 reproVersStr=xferParams.reproVersStr,
411 ffluOnly=xferParams.ffluOnly,
412 onlyHeaderTransfer=xferParams.onlyHeaderTransfer,
413 isTestRun=xferParams.isTestRun)
414
415 self.transferLog, xferStatus, logText, dateList, \
416 self.transferList, self.deprFileList = \
417 casuTransfer.transfer(int(self.runDate[0]),
418 self.xferList)
419 self.dateSet |= set(dateList)
420 self.allXferLogs[self.runDate[0]].append(self.transferLog)
421 else:
422
423 multiThreaded = True
424 threadDLs = ThreadDL()
425 resDict = threadDLs.run(self.runDate, xferParams)
426 self.allXferLogs, self.transferList, self.deprFileList = \
427 threadDLs.finaliseTransfer(resDict)
428 del threadDLs
429
430
431 keepScpName = os.path.join(self.sysc.sysPath,
432 "scpcam%s_%01d%01d")
433 for i, dateStr in enumerate(self.runDate):
434 for j in xrange(self.numThreads % 10):
435 try:
436 os.rename(
437 getScpScriptName(self.sysc, dateStr, j),
438 keepScpName % (self.sysc.loadDatabase,
439 i, j))
440 except OSError:
441 pass
442
443
444 for entry in sorted(self.dateSet):
445 cuedFile = File(os.path.join(
446 entry, "CU01ED_" + self.database.rpartition('.')[2]))
447 if not self.isTrialRun and not self.ffluOnly:
448 cuedFile.wopen()
449 cuedFile.writetheline(utils.makeTimeStamp())
450 cuedFile.close()
451
452 if not multiThreaded:
453
454 if xferStatus < 0:
455 self.DLStatus = -1
456 elif xferStatus:
457 self.DLStatus = dbc.yes()
458 else:
459 self.DLStatus = dbc.no()
460
461
462 Logger.addMessage("File transfer finished with status = "
463 + str(self.DLStatus))
464 if 'otm' not in self.reproMode and not self.sysc.isGES():
465 Logger.addMessage("Data Transfer messages:\n")
466 Logger.addMessage(logText.getvalue())
467 Logger.addMessage("Files downloaded are in: " \
468 + self.transferLog)
469
470
471 timestamp = utils.makeMssqlTimeStamp()
472 histories = {}
473
474 histories.setdefault("DCH", []).extend(sorted(self.dateSet))
475
476 histories.setdefault("ACH", []).append(
477 [self._cuEventID, self.cuNum, _logFile.pathName,
478 dbc.charDefault(), timestamp, self.curator,
479 self.comment, dbc.yes()])
480
481
482 histories.setdefault("PCH", []).append([self.DLStatus])
483 if self.isTrialRun:
484 for entry in histories:
485 print "%s: %r" % (entry, histories[entry])
486 else:
487 ingestLogFile.pickleWrite([], {}, histories)
488 Logger.addMessage(''.join(["Curation History info written to ",
489 ingestLogFile.name]))
490
491 except Exception as error:
492
493 Logger.addExceptionDetails(error)
494 else:
495 Logger.addMessage("Transfer finished.")
496 Logger.addMessage("Please, run ./IngestCUFiles.py -i -r %s %s"
497 % (ingestLogFile.name, self.database))
498 Logger.addMessage("Log written to " + _logFile.pathName)
499
500 Logger.dump(file(_logFile.pathName, 'a'))
501 Logger.reset()
502
503
504 self.lockFile.remove()
505
506
507
509 """
510 Create a lock, writing info to sysLog. If lock exists, abort.
511 """
512 lockTime = utils.makeTimeStamp()
513 if self.lockFile.exists():
514 Logger.addMessage(''.join(
515 ["# ", lockTime,
516 " UTC: Lock file %s exists. scp_camb aborted." % \
517 self.lockFile.name]))
518 raise SystemExit
519 else:
520 self.lockFile.wopen()
521 self.lockFile.writetheline(
522 "%s [%d] \n" % (lockTime, os.getpid()))
523 self.lockFile.close()
524
525
526
527
528 if __name__ == '__main__':
529 CLI.progOpts += [
530 CLI.Option('b', "date",
531 "first date to process, eg. 20050101",
532 "DATE", IngCuSession.beginDateDef),
533 CLI.Option('v', "version",
534 "version number of the data",
535 "STR", '1'),
536 CLI.Option('l', "xferlog",
537 "xferlog of files to be transferred",
538 "LOGFILE", ''),
539 CLI.Option('w', "casudisk",
540 "data disk at CASU",
541 "PATH", ''),
542 CLI.Option('y', "scpthreads",
543 "number of scp threads",
544 "INT", SystemConstants.scpThreads),
545 CLI.Option('j', "janet",
546 "if True, use JANET instead of UKLight"),
547 CLI.Option('f', "force",
548 "if True, transfer regardless any checks"),
549 CLI.Option('F', "ffluonly",
550 "CU1: don't transfer but update FlatfileLookup."),
551 CLI.Option('G', "gesoptions",
552 "GES instrument and type, ie "
553 "g[iraffe]/u[ves],n[ightly],s[tacked]",
554 "STR", ''),
555 CLI.Option('H', "getheader",
556 "Only transfer FITS headers."),
557 CLI.Option('T', "timecheck",
558 "check files against CASU file timestamps"),
559 CLI.Option('R', "repromode",
560 "mode for reprocessed data: 'otm' "
561 "(one-to-many dirs) or 's::suffix' (CASU "
562 "datedir suffix).",
563 "STR", 'none'),
564 CLI.Option('Z', "deprecation",
565 "modus for deprecating files: 'mv', 'rm', 'ip'"
566 " (leave in place)",
567 "STR[2]", "mv")]
568
569 cli = CLI(Cu1, "$Revision: 9970 $")
570 Logger.addMessage(cli.getProgDetails())
571
572 cu1 = Cu1(cli.getOpt("curator"),
573 cli.getArg("database"),
574 cli.getOpt("date"),
575 cli.getOpt("version"),
576 cli.getOpt("casudisk"),
577 cli.getOpt("timecheck"),
578 cli.getOpt("scpthreads"),
579 cli.getOpt("janet"),
580 cli.getOpt("deprecation"),
581 cli.getOpt("xferlog"),
582 cli.getOpt("test"),
583 cli.getOpt("force"),
584 cli.getOpt("repromode"),
585 cli.getOpt("ffluonly"),
586 cli.getOpt("getheader"),
587 cli.getOpt("gesoptions"),
588 cli.getArg("comment"))
589 cu1.run()
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609