Data Import

This module provide a set of class to define the logic and the tools involved in the import of xls or csv file.
It assumes that every file is composed of sheet :class:`.Sheet`

Below an example of import operation ::

    from jmb.core.utils.data_import import XlsFile

    class MyImportData(XlsFile):
        fields_map = {
            'code' : 'ancode',
            'description' : 'andescr',
            'foo' : 'anfoo',
        def do(self, row, kw):
             # alternative ways of referencing data
             Contact.objects.get_or_create(code=row.ancode, defaults=kw)
             # Contact.objects.get_or_create(code=kw['code'], defaults=kw)

    x = MyImportData('anagrafiche.xls', auto=True)


A method that allows you to assign column name. You can set this with the attribute :attr:`Sheet.field_list`, the column
can be mapped in field name with :attr:`Sheet.fields_map`

from __future__ import unicode_literals

import re
import datetime
from collections import namedtuple
# import csv

import unicodecsv as csv

# Ogg = namedtuple('Ogg', ",".join(attributes))
import xlrd

[docs]class Sheet(object): """ Abstract sheet """ #: a list of field names. Optional. When present it will be used #: to name each column instead of looking at the column header. #: Field name here is the row attribute (eg: row.first_name) field_list = None #: a dict mapping field names to column headers, as an example, if field_name #: has {'first_name' : 'Nome'} it means that :meth:`read` will set:: #: #: kw['first_name'] = row.Nome fields_map = None #: Fields listed in this list will be set None if empty #: specifically needed to prevent unique problems nullable_field_list = None def __init__(self, model, opts): self.messages = dict() self.messages['inserted'] = 0 self.messages['modified'] = 0 self.messages['errors'] = [] self.opts = opts or {} self.model = model
[docs] def read(self, limit=0, start=0): """Loop on all rows of a sheet :param start: start reading at line nr. ``start`` :param limit: limit number of rows to read to ``limit`` """ Row = namedtuple('Row', ",".join(self.field_list)) values = {} j = 0 for line in self.get_rows(): j += 1 if j < start: continue if limit and j >= limit + start: break row = self.get_tuples_from_row(Row, line) kw = self.get_new_dict(row) try:, kw, j) except Exception as e: self.handle_exception(e, row, kw, j)
[docs] def handle_exception(self, e, row, kw, j): """Handle exception of the execution of :meth:`do` """ print("ERROR: row: %s - error: %s" % (j, e))
[docs] def setup(self): """Setup env possibly used in""" return
[docs] def get_fk_as_dict(self, model, value='id', key='name'): """Return a dict of all values of a (possible) fk as dict :param model: the django db model :param value: the field to use as value of the dict (default ``id``) Note: will be lowered :param key: the field to use as key of the dict (defaykt ``name``) """ values = {} qs = model.objects.all() for id, name in qs.values_list(value, key): values[name.lower()] = id return values
def get_new_dict(self, row): if not self.fields_map: return {} row_dict = {} for key, val in self.fields_map.items(): try: row_dict[key] = getattr(row, val) except AttributeError as e: msg = "'%s' is not a correct value, choices are: %s" % (val, list(vars(row).keys())) raise AttributeError(msg) for field_name in self.nullable_field_list: if not row_dict[field_name]: row_dict[field_name] = None return row_dict def get_rows(self, sheet_index=None, sheet=None): return NotImplementedError
[docs] def do(self, row, kw, j): """implement here your import :param row: a namedtuple. Names are the names in the original columns :param kw: a dict suitable to feed ``defaults`` keyword attribute of ``get_or_create`` if fields_map is provided:: def do(self, row, kw): Contact.objects.get_or_create(cod=row.cod, defaults=kw) :param j: integer 1-based counting read rows """ print(values)
[docs] def clean_field_name(self, value): """Clean each column heading to make it a suitable field_name. By default strip empty spaces, non ascii chars, parenthesis :param value: the column header """ return re.sub('[\s\(\)\.\W]', '', value)
[docs]class XlsFile(Sheet): data_rows_index = 0 def __init__(self, filename=None, file_contents=None, auto=False, sheet_index=0, opts=None, model=None): """ :param filename: the cvs file to open :param auto_fields: (boolean) se True, i nomi delle colonne saranno desunti dalla prima riga :param sheet_index: indice del foglio da usare (default: 0) """ try: if filename: = xlrd.open_workbook(filename) if file_contents: = xlrd.open_workbook(file_contents=file_contents) except IOError as e: print(e) raise self.sheet = self.sheet_index = if auto: self.sheet = self.field_list = self.get_field_list(self.sheet.row(0)) self.data_rows_index = 1 self.setup() self.nullable_field_list = self.nullable_field_list or [] super(XlsFile, self).__init__(model=model, opts=opts)
[docs] def xls2csv(self): "Really poor version of csv file" output = [] for i in self.get_sheets_as_list(): output += [";".join([str(x.value) for x in i])] return "\n".join(output)
[docs] def get_sheet_as_list(self, sheet_name=None, all_sheets=True): "get named sheet or all sheets" rows = [] if sheet_name: try: sheet = except Exception as e: raise rows += self.get_rows(sheet) elif all_sheets is True and sheet_name is None: for index in range( sheet = rows += self.get_rows(sheet) else: sheet = rows += self.get_rows(sheet) return rows
[docs] def get_rows(self, sheet_index=0, sheet=None): """ :param sheet_index: the index of the sheet to read :param sheet: a possible sheet object as returned by sheet_by_index """ sheet = self.sheet or rows = [] for nrow in range(self.data_rows_index, sheet.nrows): yield sheet.row(nrow)
[docs] def xldate2date(self, cell): "Returna un dal valore della cella" assert cell.ctype == 3 return*xlrd.xldate_as_tuple(cell.value, 0)[0:3])
[docs] def get_tuples_from_row(self, Row, row): """ :param Row: typename :param row: iterable """ return Row._make(self.get_cell_value(cell) for cell in row)
[docs] def get_cell_value(self, cell): """Return the best guess for the correct value""" if cell.ctype == 0: # empty cell return '' if cell.ctype == 1: # string return cell.value.strip() elif cell.ctype == 2: # float return cell.value elif cell.ctype == 3: # date/datetime return self.xldate2date(cell) elif cell.ctype == 4: return bool(cell.value)
def get_field_list(self, row): if self.field_list: return self.field_list return [self.clean_field_name(self.get_cell_value(cell)) for cell in row]
[docs]class CsvFile(Sheet): def __init__(self, filename, auto=False, delimiter=";", opts=None, model=None): """ :param filename: the cvs file to open :param auto_fields: (boolean) se True, i nomi delle colonne saranno desunti dalla prima riga :param delimiter: delimitatore di campi (default ``,``) """ self.reader = csv.reader(open(filename, "rb"), delimiter=delimiter) if auto: self.field_list = self.get_field_list(next(self.reader)) self.setup() self.nullable_field_list = self.nullable_field_list or [] super(CsvFile, self).__init__(model=model, opts=opts)
[docs] def get_tuples_from_row(self, Row, row): """ :param Row: typename :param row: iterable """ return Row._make(row[:len(self.field_list)])
def get_rows(self): return self.reader def get_field_list(self, row): if self.field_list: return self.field_list field_list = [self.clean_field_name(f.strip()) for f in row] return field_list
if __name__ == '__main__': simple = '''"nome", "cognome", "eta" mario, rossi, 1 nicola, di bari, 2 eufrasio, nibionno manca, il campo, ''' tab = """nome cognome eta mario rossi 1 nicola di bari 2 eufrasio nibionno manca il campo """ import sys import os def get_file(txt): filename = "/tmp/k.csv" f = open(filename, "w") f.write(txt) f.close() return filename if len(sys.argv) > 1: filename = sys.argv[1] class MyPage(Page): def do(self, **values): print(values['eta']) # p = MyPage(filename=get_file(simple), delimiter=",", auto=True) # p = MyPage(filename=get_file(tab), delimiter="\t", auto=True)