-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimport.py
executable file
·204 lines (178 loc) · 6.13 KB
/
import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python2.6
# vi:sw=2:ts=2:expandtab
# Copyright (C) 2009-2021 Ian Munsie
#
# Kosh is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Kosh is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Kosh. If not, see <http://www.gnu.org/licenses/>.
import sys
import csv
def promptOptions(prompt, options, default, help=None):
"""
TTY user interraction routine
Present user with some options and allow them to select one. A valid option
must be selected. Help may optionally be provided for the options in the form
of a parallel list with entries corresponding to those passed in options.
"""
def showHelp():
for (i,o) in enumerate(options):
print('%s: %s'%(o, help[i]))
promptoptions = ','.join([ x.upper() if x==default else x.lower() for x in options ])
if help:
promptoptions += ',?'
while True:
ret = input('%s (%s): '%(prompt, promptoptions)).lower()
if ret == '': return default
if ret in options: return ret
if ret == '?' and help: showHelp()
def confirm(prompt):
"""
TTY user interraction routine
Simple ask for yes/no confirmation
"""
return promptOptions(prompt, ('y','n'),'y') == 'y'
def prompt(prompt):
"""
TTY user interraction routine
Simple prompt for arbitrary input. Does not support a default value.
"""
return input(prompt+': ')
# FIXME: Use database constants and (possibly) list of all previously used values
knownFields = ['name','Username','Password','URL','Notes']
def recognisedField(field):
return field.lower() in [ x.lower() for x in knownFields]
def translateCase(field):
if not recognisedField(field): return field
return knownFields[ [x.lower() for x in knownFields].index(field.lower()) ]
def askHeader(fp, cfp, field):
"""
Prompt user for the name of a particular field header. Will present the user
with a sample of data from records containing that field - the user can
request additional examples. If no record has any data in the field this will
return None without prompting. Confirms any non standard field names to avoid
surprised users if they mispelled something.
TTY only for now
"""
loc = fp.tell()
found = [False]
try:
def nextExample():
"""
Get next example data or return None if no record is found containing
data for this field.
"""
while True:
try:
line = next(cfp)
if line[field]:
found[0] = True
return line[field]
except StopIteration:
if not found[0]:
print('No data in field %i, disregarding'%field)
return None
fp.seek(loc)
continue
example = nextExample()
if not example: return None
name = None
while True:
# TODO: Use readline to provide suggestions
name = prompt('Field %i (example of data: %s)' % (field, example)).lower()
if not name:
ret = promptOptions('Disregard field %i?'%field, ('y','n','e'),'n',
('Yes','No','show next Example'))
if ret == 'y': return None
if ret == 'e': example = nextExample()
elif recognisedField(name) or \
confirm('Field name %s is not recognised, use anyway?'%name):
return translateCase(name)
finally:
fp.seek(loc)
def askHeaders(fp, cfp, headers):
"""
Prompts user for header names of each field, providing entry examples and
confirmation for non standard names. Upon return the fp will have been seeked
back to the location it was at when this function was called.
If headers are provided they should be confirmed with the user, but this is
not yet implemented - for now it will immediately return headers.
TTY only for now
"""
if headers:
# TODO: Confirm headers. (TTY mode needs to implement prompt with default value - readline)
# TODO: Strip blank headers and fields
return list(map(translateCase, headers))
if headers is None:
loc = fp.tell()
line = next(cfp)
fp.seek(loc)
headers = [None]*len(line)
for field in range(len(headers)):
headers[field] = askHeader(fp, cfp, field)
return headers
def importCSV(filename, headers, db):
fp = open(filename, 'rb')
# TODO: If file not seekable, copy into cStringIO
sample = fp.read(1024)
fp.seek(0)
dl = csv.Sniffer().sniff(sample)
has_header = csv.Sniffer().has_header(sample)
del sample
cfp = csv.reader(fp, dl)
if not headers and has_header:
headers = next(cfp)
# else - verify correct number of headers passed in
headers = askHeaders(fp, cfp, headers)
print('Importing... ')
for line in cfp:
entry = {}
for (i, field) in enumerate(line):
header=headers[i]
if not header: continue
if field:
entry[header] = field
#print 'Importing',entry
if db.importEntry(entry):
sys.stdout.write('.')
else:
sys.stdout.write('x')
db.write()
print(' done')
fp.close()
passdb_default = '~/.kosh/koshdb' # FIXME: Also declared in kosh
def getParameters():
# FIXME: Perhaps combine with main kosh getParameters...
# or git approach is somewhat appealing, but for now:
from optparse import OptionParser
usage = 'usage: %prog CVSfile [passdb]'
parser = OptionParser(usage=usage)
parser.add_option('-d', '--database', dest='passdb', default=passdb_default)
(options, args) = parser.parse_args()
if len(args):
options.CSVFilename = args.pop(0)
else:
parser.error('Did not pass CSV file')
if len(args):
options.headers = args
else:
options.headers = None
return options
def main():
import os,getpass,koshdb
options = getParameters()
try:
db = koshdb.KoshDB(os.path.expanduser(options.passdb), getpass.getpass)
importCSV(options.CSVFilename, options.headers, db)
finally:
db.__del__() # FIXME
if __name__ == '__main__':
main()