Appearance
Python SQLite
Please read the Python SQLite library documentation. The following snippets for my own reference.
python
import sqlite3
def create_table(db_file: str = 'database.db', table_name: str = 'table_name', column_names: list = ['id', 'name'], column_types: list = ['INTEGER PRIMARY KEY', 'TEXT']) -> None:
conn = sqlite3.connect(db_file)
c = conn.cursor()
sql = f'CREATE TABLE IF NOT EXISTS {table_name} ('
for i in range(len(column_names)):
sql += f'{column_names[i]} {column_types[i]}, '
sql = sql[:-2] + ')'
c.execute(sql)
conn.commit()
conn.close()
python
import sqlite3
def insert_data(db_file: str, table_name: str, data: list) -> None:
conn = sqlite3.connect(db_file)
c = conn.cursor()
sql = f'INSERT INTO {table_name} VALUES ('
for i in range(len(data)):
sql += f'?, '
sql = sql[:-2] + ')'
c.execute(sql, data)
conn.commit()
conn.close()
python
import sqlite3
def select_data(db_file: str, table_name: str, column_names: list = ['*'], where: str = '') -> list:
conn = sqlite3.connect(db_file)
c = conn.cursor()
sql = f'SELECT '
for i in range(len(column_names)):
sql += f'{column_names[i]}, '
sql = sql[:-2] + f' FROM {table_name}'
if where != '':
sql += f' WHERE {where}'
c.execute(sql)
data = c.fetchall()
conn.close()
return data
python
import sqlite3
def update_data(db_file: str, table_name: str, column_names: list, data: list, where: str) -> None:
conn = sqlite3.connect(db_file)
c = conn.cursor()
sql = f'UPDATE {table_name} SET '
for i in range(len(column_names)):
sql += f'{column_names[i]} = ?, '
sql = sql[:-2] + f' WHERE {where}'
c.execute(sql, data)
conn.commit()
conn.close()
python
import sqlite3
def delete_data(db_file: str, table_name: str, where: str) -> None:
conn = sqlite3.connect(db_file)
c = conn.cursor()
sql = f'DELETE FROM {table_name} WHERE {where}'
c.execute(sql)
conn.commit()
conn.close()
python
import sqlite3
def drop_table(db_file: str, table_name: str) -> None:
conn = sqlite3.connect(db_file)
c = conn.cursor()
sql = f'DROP TABLE {table_name}'
c.execute(sql)
conn.commit()
conn.close()
python
import sqlite3
def create_database(db_file: str) -> None:
conn = sqlite3.connect(db_file)
conn.close()
python
import os
def delete_database(db_file: str) -> None:
os.remove(db_file)
python
import sqlite3
def get_tables(db_file: str) -> list:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = c.fetchall()
conn.close()
return tables
python
import sqlite3
def get_columns(db_file: str, table_name: str) -> list:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
columns = c.fetchall()
conn.close()
return columns
python
import sqlite3
def get_data(db_file: str, table_name: str) -> list:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'SELECT * FROM {table_name}')
data = c.fetchall()
conn.close()
return data
python
import sqlite3
def get_row_count(db_file: str, table_name: str) -> int:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'SELECT COUNT(*) FROM {table_name}')
count = c.fetchone()[0]
conn.close()
return count
python
import sqlite3
def get_column_count(db_file: str, table_name: str) -> int:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
count = len(c.fetchall())
conn.close()
return count
python
import sqlite3
def get_table_size(db_file: str, table_name: str) -> int:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'SELECT COUNT(*) FROM {table_name}')
count = c.fetchone()[0]
c.execute(f'PRAGMA table_info({table_name})')
count *= len(c.fetchall())
conn.close()
return count
python
import sqlite3
def get_database_size(db_file: str) -> int:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = c.fetchall()
count = 0
for table in tables:
c.execute(f'SELECT COUNT(*) FROM {table[0]}')
count += c.fetchone()[0]
c.execute(f'PRAGMA table_info({table[0]})')
count *= len(c.fetchall())
conn.close()
return count
python
import sqlite3
def get_table_names(db_file: str) -> list:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = c.fetchall()
conn.close()
return tables
python
import sqlite3
def get_column_names(db_file: str, table_name: str) -> list:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
columns = c.fetchall()
conn.close()
return columns
python
import sqlite3
def get_column_types(db_file: str, table_name: str) -> list:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
columns = c.fetchall()
conn.close()
return columns
python
import sqlite3
def get_column_type(db_file: str, table_name: str, column_name: str) -> str:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
columns = c.fetchall()
conn.close()
for column in columns:
if column[1] == column_name:
return column[2]
return ''
python
import sqlite3
def get_column_type_by_index(db_file: str, table_name: str, column_index: int) -> str:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
columns = c.fetchall()
conn.close()
return columns[column_index][2]
python
import sqlite3
def get_column_index(db_file: str, table_name: str, column_name: str) -> int:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
columns = c.fetchall()
conn.close()
for column in columns:
if column[1] == column_name:
return column[0]
return -1
python
import sqlite3
def get_column_name(db_file: str, table_name: str, column_index: int) -> str:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
columns = c.fetchall()
conn.close()
return columns[column_index][1]
python
import sqlite3
def get_column_names_by_type(db_file: str, table_name: str, column_type: str) -> list:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
columns = c.fetchall()
conn.close()
column_names = []
for column in columns:
if column[2] == column_type:
column_names.append(column[1])
return column_names
python
import sqlite3
def get_column_indexes_by_type(db_file: str, table_name: str, column_type: str) -> list:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
columns = c.fetchall()
conn.close()
column_indexes = []
for column in columns:
if column[2] == column_type:
column_indexes.append(column[0])
return column_indexes
python
import sqlite3
def get_column_names_by_index(db_file: str, table_name: str, column_index: int) -> list:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'PRAGMA table_info({table_name})')
columns = c.fetchall()
conn.close()
column_names = []
for column in columns:
if column[0] == column_index:
column_names.append(column[1])
return column_names
python
import sqlite3
import pandas as pd
def upload_to_sqlite(db_file: str, table_name: str, data: pd.DataFrame, if_exists: str = 'fail', index: bool = False, index_label: str = None, chunksize: int = None, dtype: dict = None):
conn = sqlite3.connect(db_file)
data.to_sql(table_name, conn, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype)
conn.close()
python
import sqlite3
import pandas as pd
def upload_to_sqlite_from_csv(db_file: str, table_name: str, csv_file: str, if_exists: str = 'fail', index: bool = False, index_label: str = None, chunksize: int = None, dtype: dict = None):
conn = sqlite3.connect(db_file)
data = pd.read_csv(csv_file)
data.to_sql(table_name, conn, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype)
conn.close()
python
import sqlite3
import pandas as pd
def upload_to_sqlite_from_excel(db_file: str, table_name: str, excel_file: str, sheet_name: str = 0, if_exists: str = 'fail', index: bool = False, index_label: str = None, chunksize: int = None, dtype: dict = None):
conn = sqlite3.connect(db_file)
data = pd.read_excel(excel_file, sheet_name=sheet_name)
data.to_sql(table_name, conn, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype)
conn.close()