沪ICP备19023181号-1
Collect from网页模板
Modified by Sprite小站

Postgres更新数据库大数据表

需求

  • 对数据多个字段加密
  • 数据量 18w

解决

包括两个方面

应用程序

兼容密文和明文数据, 要求多次加密以及明文解密不会报错. - 添加前缀

已存在数据更新

  • 简单方法 直接对原表更新. 基于1, 同时存在明文和密文也不会保存
import os

import arrow
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker, Session

from nomad_lib.utils import json_dumps_unicode
from nomad_lib.utils.encryption import encrypt, decrypt

# pg_sql_str = "postgresql://postgres:postgres@192.168.1.1:5432/db_one"
pg_sql_str = os.getenv("PG_LOGISTICS_CONN")
CRYPT_KEY = os.getenv('CRYPT_KEY')
CRYPT_IV = os.getenv('CRYPT_IV')
assert pg_sql_str is not None, "PG_LOGISTICS_DEV is not found"
engine = create_engine(pg_sql_str)
session: Session = scoped_session(sessionmaker(bind=engine))


def main():
    session.execute(
        " INSERT INTO test(name, school, score) VALUES ('name2', 'cb', 22);")
    session.commit()
    flights = session.execute(
        "SELECT date, name, can_drive, school, score FROM test").fetchall()
    for flight in flights:
        print(
            f"{flight.name} to {flight.school}, {flight.score} minutes."
        )


def crypt_order():
    """
    update table_test set ship_to = ship_to::jsonb || '{"address2": "address2"}'::jsonb
    where carrier = 'test' and seller_order_ref = 'ssss'
    and ship_to->>'tin' like '1%'
    """
    batch_size = 500
    where_txt = " "  # " where table_test_id < 1000"
    sql_count = "SELECT count(1) FROM table_test" + where_txt
    waybills_count = session.execute(sql_count).fetchall()
    for i in range(waybills_count[0][0] // batch_size + 1):
        sql_select = "SELECT modify_time, table_test_id, order_ref, seller_order_ref, " \
                     "tracking_reference, ship_to, bill FROM table_test "
        sql_select += where_txt
        sql_select += "order by table_test_id "
        sql_select += f"offset {i * batch_size} "
        sql_select += f"limit {batch_size} "
        waybills = session.execute(sql_select).fetchall()
        print("waybills processed", i * batch_size, "processing", len(waybills), arrow.now())
        # print([waybill.table_test_id for waybill in waybills])
        for waybill in waybills:
            # print(waybill, waybill.table_test_id, waybill.tracking_reference)
            sql_update = get_sql_update_decrypt(
                waybill.ship_to, waybill.bill, waybill.table_test_id)
            # print(sql_update)
            if sql_update:
                session.execute(sql_update)
        session.commit()


def get_sql_update_encrypt(ship_to_ori: dict, bill_ori: dict, table_test_id) -> str:
    keys = ("address1", "crypt_field")
    ship_to = ship_to_ori or dict()
    ship_to_f = {key: ship_to.get(key) for key in keys if ship_to.get(key)}
    ship_to_str = json_dumps_unicode(encrypt(ship_to_f, CRYPT_KEY, CRYPT_IV))
    bill = bill_ori or dict()
    bill_f = {key: bill.get(key) for key in keys if bill.get(key)}
    bill_str = json_dumps_unicode(encrypt(bill_f, CRYPT_KEY, CRYPT_IV))
    if not ship_to_str and not bill_str:
        return ""

    _update = f"update table_test set "
    if ship_to_str and bill_str:
        _update += f"ship_to = ship_to::jsonb || '{ship_to_str}'::jsonb, " \
                   f"bill = bill::jsonb || '{bill_str}'::jsonb "
    elif ship_to_str:
        _update += f"ship_to = ship_to::jsonb || '{ship_to_str}'::jsonb "
    elif bill_str:
        _update += f"bill = bill::jsonb || '{bill_str}'::jsonb "
    _update += f"where table_test_id={table_test_id} "
    return _update


def get_sql_update_decrypt(ship_to_ori: dict, bill_ori: dict, table_test_id) -> str:
    keys = ("address1", "crypt_field")
    ship_to = ship_to_ori or dict()
    ship_to_f = {key: ship_to.get(key) for key in keys if ship_to.get(key)}
    ship_to_str = json_dumps_unicode(decrypt(ship_to_f, CRYPT_KEY, CRYPT_IV))
    bill = bill_ori or dict()
    bill_f = {key: bill.get(key) for key in keys if bill.get(key)}
    bill_str = json_dumps_unicode(decrypt(bill_f, CRYPT_KEY, CRYPT_IV))
    if not ship_to_str and not bill_str:
        return ""

    _update = f"update table_test set "
    if ship_to_str and bill_str:
        _update += f"ship_to = ship_to::jsonb || '{ship_to_str}'::jsonb, " \
                   f"bill = bill::jsonb || '{bill_str}'::jsonb "
    elif ship_to_str:
        _update += f"ship_to = ship_to::jsonb || '{ship_to_str}'::jsonb "
    elif bill_str:
        _update += f"bill = bill::jsonb || '{bill_str}'::jsonb "
    _update += f"where table_test_id={table_test_id} "
    return _update


def run_codegen():
    """
        pip install sqlacodegen
    """
    args = 'sqlacodegen --outfile ' \
           f'./test_model.py {pg_sql_str} --tables table_test'
    os.system(args)


if __name__ == "__main__":
    print("PG utils")
    crypt_order()
  • 优化 复制原表a到b, 更新新表b, 然后更新表名. 注意, 更新b表时的新增数据, 更新表名时, 数据库不可用.
CREATE TABLE table_bak AS TABLE table;
...加密并同步新数据到table_bak, table_bak无主键, 索引等
ALTER TABLE table RENAME TO table_new; 
ALTER TABLE table_bak RENAME TO table; 

发表评论

评论列表,共 0 条评论

    暂无评论