1
2
3
4 """
5 Invoke CU29. Ingests internal catalogues created by survey teams for
6 specific programmes and then matches these to the main source table.
7 Not the same as CU16, which matches external surveys to VDFS programmes.
8 Two types of join to the source table are created. If the table has a
9 one-to-one match, then a lookup table from the new catalogue ID to the
10 sourceID is created. If it is one-to-many, then a neighbour table is used.
11
12
13 Ingest internal catalogues and create neighbour and lookup join tables
14 for a given programme.
15
16
17
18 @author: N.J.G. Cross
19 @org: WFAU, IfA, University of Edinburgh
20 """
21
22 from __future__ import division, print_function
23 import os
24 from collections import defaultdict, namedtuple
25
26 from wsatools.CLI import CLI
27 from wsatools.DbConnect.CuSession import CuSession
28 import wsatools.DbConnect.DbConstants as dbc
29 from wsatools.DbConnect.DbSession import Ingester
30 import wsatools.DbConnect.Schema as schema
31 from wsatools.Logger import Logger
32
33
34 -class Cu29(CuSession):
35 """
36 Ingest internal catalogues and create neighbour and lookup join tables
37 for a given programme.
38
39 """
40
41
42 cuNum = 29
43 _autoCommit = True
44 _isPersistent = True
45
46
47
48
49
50
51
52
53
54 _outgester = None
55
56
57
59 """ Ingest internal catalogues and create neighbour and lookup join tables.
60 """
61
62
63
64
65
66 externCatInfo = self.archive.query("*", "ExternalProduct",
67 "programmeID=%s" % self.programmeID)
68
69
70
71
72
73 self.areVariableTypes(externCatInfo)
74 for externCat in externCatInfo:
75
76 self.ingestCats(externCat)
77 for externCat in externCatInfo:
78
79 self.matchCats(externCat)
80
81
82
83
85 """ Checks to see if there is a set of variables tables that should be merged
86 into variable types table
87 """
88
89 posVarTables = [externCat.tableName for externCat in externCatInfo
90 if 'variables' in externCat.tableName.lower()]
91
92 self.secToMainDict = {}
93 for tableName in posVarTables:
94 schemaFile = [externCat.sqlSchemaFile for externCat in externCatInfo
95 if externCat.tableName == tableName].pop()
96 table = schema.parseTables(schemaFile, [tableName])[0]
97 for constraint in table.constraints:
98
99
100 if constraint.name.startswith("fk_"):
101
102 mainTableName = constraint.name.split('_')[4]
103 self.secToMainDict[tableName] = mainTableName
104
105
106
108 """ Checks to see if catalogues have been ingested
109 """
110
111
112 nonIngestedFiles = self.archive.query(
113 "fieldID,fileName,versNum",
114 "ExternalProductCatalogue",
115 "programmeID=%s and productType='%s' and isIngested=0"
116 % (self.programmeID, externCat.productType))
117 if len(nonIngestedFiles) > 0:
118 Logger.addMessage("%s %s files to ingest..." %
119 (len(nonIngestedFiles), externCat.productType))
120 ecSchema = schema.parseTables(externCat.sqlSchemaFile)
121 try:
122 ingester = Ingester(self.archive, ecSchema, self.shareFileID)
123 except schema.MismatchError as error:
124 raise Cu29.CuError(error)
125
126 for niFile in nonIngestedFiles:
127 mainTable = (self.secToMainDict[externCat.tableName]
128 if externCat.tableName in self.secToMainDict else
129 None)
130 csvFileNames = self.createIngestableFile(niFile, externCat, mainTable)
131 for csvFileName, tableName in csvFileNames:
132 ingester.ingestTable(tableName, csvFileName,
133 isCsv=True)
134 self.archive.update("ExternalProductCatalogue",
135 [("isIngested", 1)], where="programmeID=%s and productType='%s' "
136 "and fileName='%s'" % (self.programmeID, externCat.productType,
137 niFile.fileName))
138 else:
139 Logger.addMessage("No files need to be ingested!")
140 return
141
142
143
144
145
147 """ Create a csv file that is ingestable
148 """
149 externFileData = file(os.path.join(self.sysc.extProdDir, 'teamCats',
150 externCat.directory, notIngFile.fileName)).readlines()
151 tableSchema = schema.parseTables(externCat.sqlSchemaFile, [externCat.tableName])[0]
152 externFileHeaderInfo = self.parseHeader(externFileData)
153 mainID = [column.name for column in tableSchema.columns
154 if 'id' in column.name.lower() and '--/C' in column.tag and column.tag['--/C'] == 'meta.id;meta.main'][0]
155 if mainTableName:
156 currentID = self.archive.query("max(%s)" % mainID, mainTableName,
157 firstOnly=True, default=0) + 1
158 varType = externCat.tableName.lower().replace(
159 self.programme.getAcronym().lower(), '').replace('variables', '')
160 mainOutputLines = []
161 else:
162 currentID = self.archive.query("max(%s)" % mainID, externCat.tableName,
163 firstOnly=True, default=0) + 1
164 outputLines = []
165 for line in externFileData:
166 if not line.startswith('#'):
167 parts = self.breakIntoParts(line, externFileHeaderInfo)
168 outData = []
169 if mainTableName:
170 mainOutData = []
171 for column in tableSchema.columns:
172 if 'id' in column.name.lower() and '--/C' in column.tag and column.tag['--/C'] == 'meta.id;meta.main':
173 outData.append(currentID)
174 if mainTableName:
175 mainOutData.append(currentID)
176 currentID += 1
177 elif column.name == 'fieldID':
178 outData.append(notIngFile.fieldID)
179 elif column.name == 'cuEventID':
180 outData.append(self.cuEventID)
181 if mainTableName:
182 mainOutData.append(self.cuEventID)
183 elif '--/F' in column.tag and column.tag['--/F'].lower() in externFileHeaderInfo.nameDict:
184 value = parts[externFileHeaderInfo.nameDict[column.tag['--/F'].lower()]]
185 if column.tag['--/F'].lower() in externFileHeaderInfo.defaultDict and \
186 value == externFileHeaderInfo.defaultDict[column.tag['--/F'].lower()]:
187
188 value = str(column.getDefaultValue())
189 if value == '':
190 value = str(column.getDefaultValue())
191 outData.append(value)
192 if mainTableName and (column.name == 'ra' or column.name == 'dec'):
193 mainOutData.append(value)
194 else:
195 outData.append(column.getDefaultValue())
196 if mainTableName:
197 mainOutData.append(varType)
198 mainOutputLines.append(','.join(map(str, mainOutData)) + '\n')
199 outputLines.append(','.join(map(str, outData)) + '\n')
200 outputFiles = []
201 if mainTableName:
202 ingestFileName = self.archive.sharePath("%s_%s.csv" %
203 (mainTableName, self.shareFileID))
204 file(ingestFileName, 'w').writelines(mainOutputLines)
205 outputFiles.append((ingestFileName, mainTableName))
206 ingestFileName = self.archive.sharePath("%s_%s.csv" %
207 (externCat.tableName, self.shareFileID))
208 file(ingestFileName, 'w').writelines(outputLines)
209 outputFiles.append((ingestFileName, externCat.tableName))
210 return outputFiles
211
212
214 """ Breaks line into correct parts
215 """
216 parts = []
217 if externFileHeaderInfo.isFixedLength:
218 for ii in range(len(externFileHeaderInfo.nameDict)):
219 partName = [name for name in externFileHeaderInfo.nameDict
220 if externFileHeaderInfo.nameDict[name] == ii][0]
221 parts.append(' '.join(
222 line[externFileHeaderInfo.startEndDict[partName][0] - 1:
223 externFileHeaderInfo.startEndDict[partName][1]].split()))
224 else:
225 parts = line.split()
226 return parts
227
228
229
231 """ Parse the header to get information
232 """
233
234 Header = namedtuple("Header", "isFixedLength nameDict startEndDict defaultDict")
235 headerLines = [line for line in fileData
236 if line.startswith('#')]
237 isFixedLength = False
238 nameDict = {}
239 defaultDict = {}
240 startEndDict = None
241 if headerLines[0].startswith('# ASCII'):
242 for ii, line in enumerate(headerLines):
243 if ii > 0 and len(line) >= 3:
244 name = line.split()[1].lower()
245 nameDict[name] = (ii - 1)
246 if 'DEF=' in line:
247 defaultDict[name] = line.split('DEF=')[1].split()[0]
248
249
250
251 elif headerLines[0].startswith('# FIXED LENGTH'):
252 isFixedLength = True
253 startEndDict = defaultdict(list)
254 for ii, line in enumerate(headerLines):
255 if ii > 0 and len(line) >= 3 and len(line.split()) > 1:
256 name = line.split()[1].lower()
257 nameDict[name] = (ii - 1)
258 startEndDict[name] = [int(point) for point in line.split()[2].split('-')]
259 if 'DEF=' in line:
260 defaultDict[name] = line.split('DEF=')[1].split()[0]
261 return Header(isFixedLength, nameDict, startEndDict, defaultDict)
262
263
264
266 """ Match to source table - either directly or with neighbour table.
267 """
268
269 self.setProgrammeTable(externCat)
270 self.setupSchema(externCat)
271
272
273
274
322
323
324
326 """ Create schema from template.
327 """
328
329
330
331
332
333 tableName = (self.secToMainDict[externCat.tableName]
334 if externCat.tableName in self.secToMainDict
335 else externCat.tableName)
336 pTable = self.archive.query("*", "ProgrammeTable", "programmeID=%s AND "
337 "tableName='%s'" % (self.programmeID, tableName),
338 firstOnly=True)
339 if externCat.sourceMatched:
340
341
342
343 pass
344 else:
345
346 joinCriterion = 1. / 3600.
347 neighbourTableName = '%sX%s' % (self.programme.getSourceTable(),
348 pTable.tableName.replace(self.programme.getAcronym().lower(),
349 ''))
350 if not self.archive.queryEntriesExist("RequiredNeighbours",
351 "programmeID=%s AND tableID=2 AND surveyID=%s AND "
352 "extTableID=%s AND extProgID=%s" % (self.programmeID,
353 dbc.intDefault(), pTable.tableID, self.programmeID)):
354 rowData = [self.programmeID, 2, dbc.intDefault(), pTable.tableID,
355 joinCriterion, neighbourTableName, self.programmeID]
356 self.archive.insertData(
357 "RequiredNeighbours", rowData, enforcePKC=True)
358 Logger.addMessage("Make sure you run ProgrammeBuilder and CU16 ....")
359
360
361
362
363
364
365 if __name__ == '__main__':
366
367 CLI.progArgs.append(CLI.Argument('programmeID', 'LAS'))
368
369
370
371
372
373
374
375 cli = CLI(Cu29, '$Revision: 9823 $')
376 Logger.isVerbose = False
377 Logger.addMessage(cli.getProgDetails())
378
379 cu = Cu29(cli.getArg('programmeID'), cli=cli)
380
381 cu.run()
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435