Source code for data2neo.relational_modules.sqlite

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Implementation for pandas as the relational module. 

authors: Julian Minder
"""

from typing import List, Iterable, Dict, Any
from .. import ResourceIterator
from .. import Resource
import sqlite3
from collections import defaultdict
from queue import Queue
import logging

logger = logging.getLogger(__name__)


[docs]class SQLiteResource(Resource): """Implementation of the sqlite Resource. Enables access to a row of a sqlite table"""
[docs] def __init__(self, data: Iterable, cols: List[str], pks: List[str], table: str) -> None: """Wraps a row of a sqlite table and is used as input data object for the converter. May hold additional supplies to pass data between factories. Args: data: Wrapped row of a sqlite table. cols: List of column names. pks: List of primary keys. table: Name of table that this row is an entity of. """ super().__init__() self._data = list(data) self._type = table # convert to dict for fast access self._cols = dict((col, i) for i,col in enumerate(cols)) self._pks = tuple(pks)
@property def type(self) -> str: """Returns the type of the resource. Is used to select correct factory""" return self._type @property def pks(self) -> Dict[str, Any]: """Returns the primary keys of the resource""" return dict((pk, self[pk]) for pk in self._pks) @property def cols(self) -> List[str]: """Returns the columns of the resource""" return list(self._cols.keys()) def __getitem__(self, key: str) -> str: """ Gets the value with key 'key'. """ return self._data[self._cols[key]] def __setitem__(self, key: str, value: Any) -> None: """ Sets the value of with key 'key'. """ if key in self._cols.keys(): self._data[self._cols[key]] = value else: self._cols[key] = len(self._data) self._data.append(value) def __repr__(self) -> str: """ Gets a string representation of the resource. Only used for logging. """ repr = f"{super().__repr__()} (" for pk in self._pks: repr += f"{pk}={self[pk]}," return repr + ")"
[docs]class SQLiteIterator(ResourceIterator): """ Implementation of the sqlite ResourceIterator. Enables iteration over a sqlite database. """
[docs] def __init__(self, database: sqlite3.Connection, filter: List[str] = None, primary_keys: Dict[str, List[str]] = None, mix_tables: bool = True, buffer_size: int = 5000, lock: "Lock" = None) -> None: """Initializes the iterator. Opens a connection to the database and saves the tables that should be iterated over. Args: database: Sqlite database connection to iterate over. filter: List of tables to iterate over. If None all tables in database are used. Defaults to None. primary_keys: Dict of primary keys for each table. If None the primary keys are determined automatically. Defaults to None. mix_tables: If True, the iterator will mix the tables. If False, the iterator will iterate over all tables in order. Defaults to True. buffer_size: Size of the buffer for each table. Defaults to 5000. lock: Lock object to synchronize the access to the sqlite database. Defaults to None. Raises: Exception: If no primary key is found for a table. """ super().__init__() assert isinstance(database, sqlite3.Connection), "Please pass a sqlite3.Connection object to the iterator." self._con = database self._con_lock = lock all_tables = [table[0] for table in self._con.execute( 'SELECT name from sqlite_master where type= "table"' ).fetchall()] self._tables = all_tables if filter is not None: self._tables = filter logger.info(f"Iterating over tables: {self._tables}") self._cols = {} self._pks = {} try: if self._con_lock is not None: self._con_lock.acquire() for table in self._tables: cols = self._con.execute(f"PRAGMA table_info('{table}');").fetchall() self._cols[table] = [col[1] for col in cols] if primary_keys is not None and table in primary_keys.keys(): self._pks[table] = primary_keys[table] else: self._pks[table] = [col[1] for col in cols if col[5]] if len(self._pks[table]) == 0: raise ValueError(f"Table '{table}' has no primary key, which is required for the conversion. Please add a primary key to the table or specify it manually when instatiating the Iterator.") finally: if self._con_lock is not None: self._con_lock.release() self._cursors = {} self._len = None self._mix_tables = mix_tables self._buffer_size = buffer_size
def _init_cursors(self): """Initiates the cursors for all tables.""" if self._con_lock is not None: self._con_lock.acquire() try: for table in self._tables: self._cursors[table] = self._con.execute(f"SELECT * FROM {table}") finally: if self._con_lock is not None: self._con_lock.release() def __iter__(self): """Returns an iterator over the database.""" self._init_cursors() buffer = defaultdict(Queue) while True: for key in self._cursors.keys(): break_flag = False while(True): try: yield buffer[key].get_nowait() except GeneratorExit: return except: # Fetch next 5000 rows if self._con_lock is not None: self._con_lock.acquire() try: res = self._cursors[key].fetchmany(5000) finally: if self._con_lock is not None: self._con_lock.release() for row in res: buffer[key].put(SQLiteResource(row, self._cols[key], self._pks[key], key)) if buffer[key].empty(): # Remove cursor from dict del self._cursors[key] if len(self._cursors) == 0: return break_flag = True break if self._mix_tables: break if break_flag: break def __len__(self) -> None: """ Returns the number of resources in the database.""" if self._len is None: self._len = 0 if self._con_lock is not None: self._con_lock.acquire() try: for table in self._tables: self._len += self._con.execute(f"SELECT Count(*) FROM {table}").fetchone()[0] finally: if self._con_lock is not None: self._con_lock.release() return self._len