Skip to content
On this page

Python SQLite

Please read the Python SQLite library documentation. The following snippets for my own reference.

python
import sqlite3

def create_table(db_file: str = 'database.db', table_name: str = 'table_name', column_names: list = ['id', 'name'], column_types: list = ['INTEGER PRIMARY KEY', 'TEXT']) -> None:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    sql = f'CREATE TABLE IF NOT EXISTS {table_name} ('
    for i in range(len(column_names)):
        sql += f'{column_names[i]} {column_types[i]}, '
    sql = sql[:-2] + ')'
    c.execute(sql)
    conn.commit()
    conn.close()
python
import sqlite3

def insert_data(db_file: str, table_name: str, data: list) -> None:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    sql = f'INSERT INTO {table_name} VALUES ('
    for i in range(len(data)):
        sql += f'?, '
    sql = sql[:-2] + ')'
    c.execute(sql, data)
    conn.commit()
    conn.close()
python
import sqlite3

def select_data(db_file: str, table_name: str, column_names: list = ['*'], where: str = '') -> list:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    sql = f'SELECT '
    for i in range(len(column_names)):
        sql += f'{column_names[i]}, '
    sql = sql[:-2] + f' FROM {table_name}'
    if where != '':
        sql += f' WHERE {where}'
    c.execute(sql)
    data = c.fetchall()
    conn.close()
    return data
python
import sqlite3

def update_data(db_file: str, table_name: str, column_names: list, data: list, where: str) -> None:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    sql = f'UPDATE {table_name} SET '
    for i in range(len(column_names)):
        sql += f'{column_names[i]} = ?, '
    sql = sql[:-2] + f' WHERE {where}'
    c.execute(sql, data)
    conn.commit()
    conn.close()
python
import sqlite3

def delete_data(db_file: str, table_name: str, where: str) -> None:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    sql = f'DELETE FROM {table_name} WHERE {where}'
    c.execute(sql)
    conn.commit()
    conn.close()
python
import sqlite3

def drop_table(db_file: str, table_name: str) -> None:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    sql = f'DROP TABLE {table_name}'
    c.execute(sql)
    conn.commit()
    conn.close()
python
import sqlite3

def create_database(db_file: str) -> None:
    conn = sqlite3.connect(db_file)
    conn.close()
python
import os

def delete_database(db_file: str) -> None:
    os.remove(db_file)
python
import sqlite3

def get_tables(db_file: str) -> list:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = c.fetchall()
    conn.close()
    return tables
python
import sqlite3

def get_columns(db_file: str, table_name: str) -> list:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    columns = c.fetchall()
    conn.close()
    return columns
python
import sqlite3

def get_data(db_file: str, table_name: str) -> list:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'SELECT * FROM {table_name}')
    data = c.fetchall()
    conn.close()
    return data
python
import sqlite3

def get_row_count(db_file: str, table_name: str) -> int:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'SELECT COUNT(*) FROM {table_name}')
    count = c.fetchone()[0]
    conn.close()
    return count
python
import sqlite3

def get_column_count(db_file: str, table_name: str) -> int:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    count = len(c.fetchall())
    conn.close()
    return count
python
import sqlite3

def get_table_size(db_file: str, table_name: str) -> int:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'SELECT COUNT(*) FROM {table_name}')
    count = c.fetchone()[0]
    c.execute(f'PRAGMA table_info({table_name})')
    count *= len(c.fetchall())
    conn.close()
    return count
python
import sqlite3

def get_database_size(db_file: str) -> int:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = c.fetchall()
    count = 0
    for table in tables:
        c.execute(f'SELECT COUNT(*) FROM {table[0]}')
        count += c.fetchone()[0]
        c.execute(f'PRAGMA table_info({table[0]})')
        count *= len(c.fetchall())
    conn.close()
    return count
python
import sqlite3

def get_table_names(db_file: str) -> list:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = c.fetchall()
    conn.close()
    return tables
python
import sqlite3

def get_column_names(db_file: str, table_name: str) -> list:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    columns = c.fetchall()
    conn.close()
    return columns
python
import sqlite3

def get_column_types(db_file: str, table_name: str) -> list:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    columns = c.fetchall()
    conn.close()
    return columns
python
import sqlite3

def get_column_type(db_file: str, table_name: str, column_name: str) -> str:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    columns = c.fetchall()
    conn.close()
    for column in columns:
        if column[1] == column_name:
            return column[2]
    return ''
python
import sqlite3

def get_column_type_by_index(db_file: str, table_name: str, column_index: int) -> str:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    columns = c.fetchall()
    conn.close()
    return columns[column_index][2]
python
import sqlite3

def get_column_index(db_file: str, table_name: str, column_name: str) -> int:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    columns = c.fetchall()
    conn.close()
    for column in columns:
        if column[1] == column_name:
            return column[0]
    return -1
python
import sqlite3

def get_column_name(db_file: str, table_name: str, column_index: int) -> str:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    columns = c.fetchall()
    conn.close()
    return columns[column_index][1]
python
import sqlite3

def get_column_names_by_type(db_file: str, table_name: str, column_type: str) -> list:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    columns = c.fetchall()
    conn.close()
    column_names = []
    for column in columns:
        if column[2] == column_type:
            column_names.append(column[1])
    return column_names
python
import sqlite3

def get_column_indexes_by_type(db_file: str, table_name: str, column_type: str) -> list:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    columns = c.fetchall()
    conn.close()
    column_indexes = []
    for column in columns:
        if column[2] == column_type:
            column_indexes.append(column[0])
    return column_indexes
python
import sqlite3

def get_column_names_by_index(db_file: str, table_name: str, column_index: int) -> list:
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    c.execute(f'PRAGMA table_info({table_name})')
    columns = c.fetchall()
    conn.close()
    column_names = []
    for column in columns:
        if column[0] == column_index:
            column_names.append(column[1])
    return column_names
python
import sqlite3
import pandas as pd

def upload_to_sqlite(db_file: str, table_name: str, data: pd.DataFrame, if_exists: str = 'fail', index: bool = False, index_label: str = None, chunksize: int = None, dtype: dict = None):
    conn = sqlite3.connect(db_file)
    data.to_sql(table_name, conn, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype)
    conn.close()
python
import sqlite3
import pandas as pd

def upload_to_sqlite_from_csv(db_file: str, table_name: str, csv_file: str, if_exists: str = 'fail', index: bool = False, index_label: str = None, chunksize: int = None, dtype: dict = None):
    conn = sqlite3.connect(db_file)
    data = pd.read_csv(csv_file)
    data.to_sql(table_name, conn, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype)
    conn.close()
python
import sqlite3
import pandas as pd

def upload_to_sqlite_from_excel(db_file: str, table_name: str, excel_file: str, sheet_name: str = 0, if_exists: str = 'fail', index: bool = False, index_label: str = None, chunksize: int = None, dtype: dict = None):
    conn = sqlite3.connect(db_file)
    data = pd.read_excel(excel_file, sheet_name=sheet_name)
    data.to_sql(table_name, conn, if_exists=if_exists, index=index, index_label=index_label, chunksize=chunksize, dtype=dtype)
    conn.close()