Package invocations :: Package cu29 :: Module cu29
[hide private]

Source Code for Module invocations.cu29.cu29

  1  #! /usr/bin/env python 
  2  #------------------------------------------------------------------------------ 
  3  #$$ 
  4  """ 
  5     Invoke CU29. Ingests internal catalogues created by survey teams for  
  6     specific programmes and then matches these to the main source table.  
  7     Not the same as CU16, which matches external surveys to VDFS programmes. 
  8     Two types of join to the source table are created. If the table has a  
  9     one-to-one match, then a lookup table from the new catalogue ID to the  
 10     sourceID is created. If it is one-to-many, then a neighbour table is used. 
 11      
 12      
 13     Ingest internal catalogues and create neighbour and lookup join tables  
 14     for a given programme. 
 15   
 16   
 17   
 18     @author: N.J.G. Cross 
 19     @org:    WFAU, IfA, University of Edinburgh 
 20  """ 
 21  #------------------------------------------------------------------------------ 
 22  from   __future__ import division, print_function 
 23  import os 
 24  from   collections import defaultdict, namedtuple 
 25   
 26  from   wsatools.CLI                 import CLI 
 27  from   wsatools.DbConnect.CuSession import CuSession 
 28  import wsatools.DbConnect.DbConstants   as dbc 
 29  from   wsatools.DbConnect.DbSession import Ingester 
 30  import wsatools.DbConnect.Schema        as schema 
 31  from   wsatools.Logger              import Logger 
 32  #------------------------------------------------------------------------------ 
 33   
34 -class Cu29(CuSession):
35 """ 36 Ingest internal catalogues and create neighbour and lookup join tables 37 for a given programme. 38 39 """ 40 #-------------------------------------------------------------------------- 41 # Class parameters - should not be altered 42 cuNum = 29 # Overrides CuSession default 43 _autoCommit = True # Overrides CuSession default 44 _isPersistent = True # Overrides CuSession default 45 46 #-------------------------------------------------------------------------- 47 # Public member variables (with defaults) - set from command-line options 48 49 50 #-------------------------------------------------------------------------- 51 # Private member variables - initialised by class 52 53 #: A DbSession.Outgester object for performing outgests. 54 _outgester = None 55 56 #-------------------------------------------------------------------------- 57
58 - def _onRun(self):
59 """ Ingest internal catalogues and create neighbour and lookup join tables. 60 """ 61 62 # First check if any internal products.. 63 # Use ExternalProduct - catalogues are internal to survey, 64 # but created externally to VDFS 65 66 externCatInfo = self.archive.query("*", "ExternalProduct", 67 "programmeID=%s" % self.programmeID) 68 69 # | | programmeID | productType | description 70 # | institution | creator | releaseNum | directory 71 # | fileNameRoot | zeroPoint | zpSystem | sqlSchemaFile 72 # | tableName | sourceMatched | reference | ingested | | 73 self.areVariableTypes(externCatInfo) 74 for externCat in externCatInfo: 75 # Has data been ingested if not ingest 76 self.ingestCats(externCat) 77 for externCat in externCatInfo: 78 # Match 79 self.matchCats(externCat)
80 81 # Finished 82 83 #--------------------------------------------------------------------------
84 - def areVariableTypes(self, externCatInfo):
85 """ Checks to see if there is a set of variables tables that should be merged 86 into variable types table 87 """ 88 # Find possible variables tables 89 posVarTables = [externCat.tableName for externCat in externCatInfo 90 if 'variables' in externCat.tableName.lower()] 91 # Find main. Use name / Foreign Key constraints... 92 self.secToMainDict = {} 93 for tableName in posVarTables: 94 schemaFile = [externCat.sqlSchemaFile for externCat in externCatInfo 95 if externCat.tableName == tableName].pop() 96 table = schema.parseTables(schemaFile, [tableName])[0] 97 for constraint in table.constraints: 98 # e.g. fk_Provenance_combiframeID_to_Multiframe_multiframeID 99 # # possibly have to make tougher constraint. 100 if constraint.name.startswith("fk_"): 101 # Parse 102 mainTableName = constraint.name.split('_')[4] 103 self.secToMainDict[tableName] = mainTableName
104 105 #-------------------------------------------------------------------------- 106
107 - def ingestCats(self, externCat):
108 """ Checks to see if catalogues have been ingested 109 """ 110 # @FIXME: Sort out defaults.. 111 # Correct defaults in PSF cats - mainly 112 nonIngestedFiles = self.archive.query( 113 "fieldID,fileName,versNum", 114 "ExternalProductCatalogue", 115 "programmeID=%s and productType='%s' and isIngested=0" 116 % (self.programmeID, externCat.productType)) 117 if len(nonIngestedFiles) > 0: 118 Logger.addMessage("%s %s files to ingest..." % 119 (len(nonIngestedFiles), externCat.productType)) 120 ecSchema = schema.parseTables(externCat.sqlSchemaFile) 121 try: 122 ingester = Ingester(self.archive, ecSchema, self.shareFileID) 123 except schema.MismatchError as error: 124 raise Cu29.CuError(error) 125 126 for niFile in nonIngestedFiles: 127 mainTable = (self.secToMainDict[externCat.tableName] 128 if externCat.tableName in self.secToMainDict else 129 None) 130 csvFileNames = self.createIngestableFile(niFile, externCat, mainTable) 131 for csvFileName, tableName in csvFileNames: 132 ingester.ingestTable(tableName, csvFileName, 133 isCsv=True) 134 self.archive.update("ExternalProductCatalogue", 135 [("isIngested", 1)], where="programmeID=%s and productType='%s' " 136 "and fileName='%s'" % (self.programmeID, externCat.productType, 137 niFile.fileName)) 138 else: 139 Logger.addMessage("No files need to be ingested!") 140 return
141 142 #-------------------------------------------------------------------------- 143 144 145
146 - def createIngestableFile(self, notIngFile, externCat, mainTableName):
147 """ Create a csv file that is ingestable 148 """ 149 externFileData = file(os.path.join(self.sysc.extProdDir, 'teamCats', 150 externCat.directory, notIngFile.fileName)).readlines() 151 tableSchema = schema.parseTables(externCat.sqlSchemaFile, [externCat.tableName])[0] 152 externFileHeaderInfo = self.parseHeader(externFileData) 153 mainID = [column.name for column in tableSchema.columns 154 if 'id' in column.name.lower() and '--/C' in column.tag and column.tag['--/C'] == 'meta.id;meta.main'][0] 155 if mainTableName: 156 currentID = self.archive.query("max(%s)" % mainID, mainTableName, 157 firstOnly=True, default=0) + 1 158 varType = externCat.tableName.lower().replace( 159 self.programme.getAcronym().lower(), '').replace('variables', '') 160 mainOutputLines = [] 161 else: 162 currentID = self.archive.query("max(%s)" % mainID, externCat.tableName, 163 firstOnly=True, default=0) + 1 164 outputLines = [] 165 for line in externFileData: 166 if not line.startswith('#'): 167 parts = self.breakIntoParts(line, externFileHeaderInfo) 168 outData = [] 169 if mainTableName: 170 mainOutData = [] 171 for column in tableSchema.columns: 172 if 'id' in column.name.lower() and '--/C' in column.tag and column.tag['--/C'] == 'meta.id;meta.main': 173 outData.append(currentID) 174 if mainTableName: 175 mainOutData.append(currentID) 176 currentID += 1 177 elif column.name == 'fieldID': 178 outData.append(notIngFile.fieldID) 179 elif column.name == 'cuEventID': 180 outData.append(self.cuEventID) 181 if mainTableName: 182 mainOutData.append(self.cuEventID) 183 elif '--/F' in column.tag and column.tag['--/F'].lower() in externFileHeaderInfo.nameDict: 184 value = parts[externFileHeaderInfo.nameDict[column.tag['--/F'].lower()]] 185 if column.tag['--/F'].lower() in externFileHeaderInfo.defaultDict and \ 186 value == externFileHeaderInfo.defaultDict[column.tag['--/F'].lower()]: 187 # @TODO: This doesn't work when they have near default values. -99.98 not -99.999 188 value = str(column.getDefaultValue()) 189 if value == '': 190 value = str(column.getDefaultValue()) 191 outData.append(value) 192 if mainTableName and (column.name == 'ra' or column.name == 'dec'): 193 mainOutData.append(value) 194 else: 195 outData.append(column.getDefaultValue()) 196 if mainTableName: 197 mainOutData.append(varType) 198 mainOutputLines.append(','.join(map(str, mainOutData)) + '\n') 199 outputLines.append(','.join(map(str, outData)) + '\n') 200 outputFiles = [] 201 if mainTableName: 202 ingestFileName = self.archive.sharePath("%s_%s.csv" % 203 (mainTableName, self.shareFileID)) 204 file(ingestFileName, 'w').writelines(mainOutputLines) 205 outputFiles.append((ingestFileName, mainTableName)) 206 ingestFileName = self.archive.sharePath("%s_%s.csv" % 207 (externCat.tableName, self.shareFileID)) 208 file(ingestFileName, 'w').writelines(outputLines) 209 outputFiles.append((ingestFileName, externCat.tableName)) 210 return outputFiles
211 #-------------------------------------------------------------------------- 212
213 - def breakIntoParts(self, line, externFileHeaderInfo):
214 """ Breaks line into correct parts 215 """ 216 parts = [] 217 if externFileHeaderInfo.isFixedLength: 218 for ii in range(len(externFileHeaderInfo.nameDict)): 219 partName = [name for name in externFileHeaderInfo.nameDict 220 if externFileHeaderInfo.nameDict[name] == ii][0] 221 parts.append(' '.join( 222 line[externFileHeaderInfo.startEndDict[partName][0] - 1: 223 externFileHeaderInfo.startEndDict[partName][1]].split())) 224 else: 225 parts = line.split() 226 return parts
227 228 #-------------------------------------------------------------------------- 229
230 - def parseHeader(self, fileData):
231 """ Parse the header to get information 232 """ 233 234 Header = namedtuple("Header", "isFixedLength nameDict startEndDict defaultDict") 235 headerLines = [line for line in fileData 236 if line.startswith('#')] 237 isFixedLength = False 238 nameDict = {} 239 defaultDict = {} 240 startEndDict = None 241 if headerLines[0].startswith('# ASCII'): 242 for ii, line in enumerate(headerLines): 243 if ii > 0 and len(line) >= 3: 244 name = line.split()[1].lower() 245 nameDict[name] = (ii - 1) 246 if 'DEF=' in line: 247 defaultDict[name] = line.split('DEF=')[1].split()[0] 248 249 250 251 elif headerLines[0].startswith('# FIXED LENGTH'): 252 isFixedLength = True 253 startEndDict = defaultdict(list) 254 for ii, line in enumerate(headerLines): 255 if ii > 0 and len(line) >= 3 and len(line.split()) > 1: 256 name = line.split()[1].lower() 257 nameDict[name] = (ii - 1) 258 startEndDict[name] = [int(point) for point in line.split()[2].split('-')] 259 if 'DEF=' in line: 260 defaultDict[name] = line.split('DEF=')[1].split()[0] 261 return Header(isFixedLength, nameDict, startEndDict, defaultDict)
262 263 #-------------------------------------------------------------------------- 264
265 - def matchCats(self, externCat):
266 """ Match to source table - either directly or with neighbour table. 267 """ 268 # @TODO: Maybe all should be neighbour tables? 269 self.setProgrammeTable(externCat) 270 self.setupSchema(externCat)
271 # CU16 will automatically match 272 273 274 #--------------------------------------------------------------------------
275 - def setProgrammeTable(self, externCat):
276 """ Set entry in ProgrammeTable if it does not exist. 277 """ 278 tableName = (self.secToMainDict[externCat.tableName] 279 if externCat.tableName in self.secToMainDict 280 else externCat.tableName) 281 if self.archive.queryEntriesExist( 282 "ProgrammeTable", "programmeID=%s and tableName='%s'" % 283 (self.programmeID, tableName)): 284 return 285 tableSchema = schema.parseTables(externCat.sqlSchemaFile, 286 [tableName])[0] 287 mainID = [column.name for column in tableSchema.columns 288 if 'id' in column.name.lower() and '--/C' in column.tag and 289 column.tag['--/C'] == 'meta.id;meta.main'][0] 290 possibleRaCols = [column.name for column in tableSchema.columns 291 if '--/C' in column.tag and 292 column.tag['--/C'] == 'pos.eq.ra'] 293 possibleDecCols = [column.name for column in tableSchema.columns 294 if '--/C' in column.tag and 295 column.tag['--/C'] == 'pos.eq.dec'] 296 if len(possibleRaCols) == 1 and len(possibleDecCols) == 1: 297 raCol = possibleRaCols.pop() 298 decCol = possibleDecCols.pop() 299 elif len(possibleRaCols) == 0 or len(possibleDecCols) == 0: 300 raise Cu29.CuError("No position column or missing UCD info in table %s" 301 % tableName) 302 else: 303 # Find main positional columns 304 mainRaCols = [colName for colName in possibleRaCols for column in tableSchema.columns 305 if column.name == colName and column.name == 'ra'] 306 mainDecCols = [colName for colName in possibleDecCols for column in tableSchema.columns 307 if column.name == colName and column.name == 'dec'] 308 if len(mainRaCols) == 1 and len(mainDecCols) == 1: 309 raCol = mainRaCols.pop() 310 decCol = mainDecCols.pop() 311 else: 312 raise Cu29.CuError("No clear positional columns in table %s" 313 % tableName) 314 if raCol != 'ra' and decCol != 'dec': 315 raise Cu29.CuError("Main positional columns should be named ra and dec in %s" 316 % tableName) 317 maxTableID = self.archive.queryAttrMax("tableID", 318 "ProgrammeTable", "programmeID=%s" % self.programmeID) or 0 319 rowData = [self.programmeID, maxTableID + 1, tableName, 320 mainID] 321 self.archive.insertData("ProgrammeTable", rowData, enforcePKC=True)
322 323 #-------------------------------------------------------------------------- 324
325 - def setupSchema(self, externCat):
326 """ Create schema from template. 327 """ 328 # if not sourcematched 329 # RequiredNeighbours 330 # parseSchema 331 # else 332 # RequiredLookUp table 333 tableName = (self.secToMainDict[externCat.tableName] 334 if externCat.tableName in self.secToMainDict 335 else externCat.tableName) 336 pTable = self.archive.query("*", "ProgrammeTable", "programmeID=%s AND " 337 "tableName='%s'" % (self.programmeID, tableName), 338 firstOnly=True) 339 if externCat.sourceMatched: 340 # Look up table.... 341 # RequiredLookup... 342 # @TODO: Still needs to be completed 343 pass 344 else: 345 # Check to see if already in there 346 joinCriterion = 1. / 3600. # 1 arcsecond 347 neighbourTableName = '%sX%s' % (self.programme.getSourceTable(), 348 pTable.tableName.replace(self.programme.getAcronym().lower(), 349 '')) 350 if not self.archive.queryEntriesExist("RequiredNeighbours", 351 "programmeID=%s AND tableID=2 AND surveyID=%s AND " 352 "extTableID=%s AND extProgID=%s" % (self.programmeID, 353 dbc.intDefault(), pTable.tableID, self.programmeID)): 354 rowData = [self.programmeID, 2, dbc.intDefault(), pTable.tableID, 355 joinCriterion, neighbourTableName, self.programmeID] 356 self.archive.insertData( 357 "RequiredNeighbours", rowData, enforcePKC=True) 358 Logger.addMessage("Make sure you run ProgrammeBuilder and CU16 ....")
359 #ProgrammeBuilder(str(self.programmeID), database=self.archive.database, 360 # isTrialRun=True, userName=self.archive.userName).run() 361 #------------------------------------------------------------------------------ 362 # Entry point for script. 363 364 # Allow module to be imported as well as executed from the command line 365 if __name__ == '__main__': 366 # Define additional command-line options for Cu16 367 CLI.progArgs.append(CLI.Argument('programmeID', 'LAS')) 368 #CLI.progOpts += [ 369 # CLI.Option('e', 'extsurvey', 'only for this external survey e.g. MGC', 370 # 'NAME/ID'), 371 # CLI.Option('n', 'table', 'only for this neighbour table', 'NAME'), 372 # CLI.Option('o', 'outgest', 'outgest from the default database (%s), ' 373 # 'rather than this database' % DbSession.database)] 374 375 cli = CLI(Cu29, '$Revision: 9823 $') 376 Logger.isVerbose = False 377 Logger.addMessage(cli.getProgDetails()) 378 379 cu = Cu29(cli.getArg('programmeID'), cli=cli) 380 381 cu.run() 382 383 #------------------------------------------------------------------------------ 384 # Change log: 385 # 386 # 29-Nov-2005, RSC: Original version of the new C++ matching code CU16 version 387 # 13-Dec-2005, RSC: Initial check-in of "working" test version 388 # 13-Dec-2005, RSC: Major redesign of functions and redesign of dtd files 389 # 14-Dec-2005, RSC: * Reverted to original design of dtd files, which are now 390 # sorted in the correct order. 391 # * Improved error handling. 392 # * Minor outgest bug fix. 393 # 15-Dec-2005, RSC: * File transfer bug fix - now using "cp" instead of "mv" 394 # - expected file size is checked before transfer 395 # - more error checking for transfers 396 # * Separate outgest directory on ahmose for easy clean-up 397 # * SSA outgest bug fixed 398 # * Preparing to use TestWSArsc for ingest testing 399 # 2-Feb-2006, RSC: * Timestamps added to database outgest files on ahmose 400 # * Debugged ingest 401 # * Removed code debug features 402 # * Added helpful comments and dbName to logfile 403 # 10-Feb-2006, RSC: Working version - save for Sloan cross-catalogues matches 404 # * Various ingest bug fixes 405 # - tables now emptied before ingest 406 # - overcame slow windows nfs issues 407 # - working Sloan issue solution - but way too slow, 408 # so commented out for now 409 # * Script now tidies up its own mess 410 # * The unlikely case of an empty catalogue is now handled 411 # well 412 # * Temporary mods for test purposes 413 # 13-Feb-2006, RSC: Sloan cross-matches now work 414 # 16-Feb-2006, RSC: * Adapted code to adopt the new OO design for curation 415 # tasks 416 # * Neighbour tables are now dropped/recreated using the 417 # SQL wrapper, instead of just emptying the existing table 418 # * Neatened up use of RequiredNeighbours table 419 # * Improved script execution 420 # 17-Feb-2006, RSC: Improved OO design splitting functionality into separate 421 # objects in separate files in preparation for install 422 # 11-Apr-2006, RSC: * Curator name is now a command line option 423 # * Neatened up exception handling 424 # 11-May-2006, RSC: Significant overhaul of design to allow outgest of 425 # detection as well as source tables for neighbours 426 # 27-Jun-2006, RSC: Upgraded to use the new command-line interface. 427 # 10-Jul-2006, RSC: Deprecated the old CU16 now that the new CU16 has proven 428 # itself for DR1. A working version of the old CU16 can 429 # still be obtained by checking out the DR1_FINAL tag in CVS 430 # 2-Aug-2006, RSC: Upgraded to latest CuSession OO framework design. 431 # 4-Aug-2006, RSC: Upgraded to handle separate files for different 432 # programmes' neighbour table schemas. 433 # 11-Apr-2007, RSC: Upgraded for new RequiredNeighbours schema to allow cross- 434 # matches between UKIDSS surveys. 435