需求
- 对数据多个字段加密
- 数据量 18w
解决
包括两个方面
应用程序
兼容密文和明文数据, 要求多次加密以及明文解密不会报错. - 添加前缀
已存在数据更新
- 简单方法 直接对原表更新. 基于1, 同时存在明文和密文也不会保存
import os
import arrow
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker, Session
from nomad_lib.utils import json_dumps_unicode
from nomad_lib.utils.encryption import encrypt, decrypt
# pg_sql_str = "postgresql://postgres:postgres@192.168.1.1:5432/db_one"
pg_sql_str = os.getenv("PG_LOGISTICS_CONN")
CRYPT_KEY = os.getenv('CRYPT_KEY')
CRYPT_IV = os.getenv('CRYPT_IV')
assert pg_sql_str is not None, "PG_LOGISTICS_DEV is not found"
engine = create_engine(pg_sql_str)
session: Session = scoped_session(sessionmaker(bind=engine))
def main():
session.execute(
" INSERT INTO test(name, school, score) VALUES ('name2', 'cb', 22);")
session.commit()
flights = session.execute(
"SELECT date, name, can_drive, school, score FROM test").fetchall()
for flight in flights:
print(
f"{flight.name} to {flight.school}, {flight.score} minutes."
)
def crypt_order():
"""
update table_test set ship_to = ship_to::jsonb || '{"address2": "address2"}'::jsonb
where carrier = 'test' and seller_order_ref = 'ssss'
and ship_to->>'tin' like '1%'
"""
batch_size = 500
where_txt = " " # " where table_test_id < 1000"
sql_count = "SELECT count(1) FROM table_test" + where_txt
waybills_count = session.execute(sql_count).fetchall()
for i in range(waybills_count[0][0] // batch_size + 1):
sql_select = "SELECT modify_time, table_test_id, order_ref, seller_order_ref, " \
"tracking_reference, ship_to, bill FROM table_test "
sql_select += where_txt
sql_select += "order by table_test_id "
sql_select += f"offset {i * batch_size} "
sql_select += f"limit {batch_size} "
waybills = session.execute(sql_select).fetchall()
print("waybills processed", i * batch_size, "processing", len(waybills), arrow.now())
# print([waybill.table_test_id for waybill in waybills])
for waybill in waybills:
# print(waybill, waybill.table_test_id, waybill.tracking_reference)
sql_update = get_sql_update_decrypt(
waybill.ship_to, waybill.bill, waybill.table_test_id)
# print(sql_update)
if sql_update:
session.execute(sql_update)
session.commit()
def get_sql_update_encrypt(ship_to_ori: dict, bill_ori: dict, table_test_id) -> str:
keys = ("address1", "crypt_field")
ship_to = ship_to_ori or dict()
ship_to_f = {key: ship_to.get(key) for key in keys if ship_to.get(key)}
ship_to_str = json_dumps_unicode(encrypt(ship_to_f, CRYPT_KEY, CRYPT_IV))
bill = bill_ori or dict()
bill_f = {key: bill.get(key) for key in keys if bill.get(key)}
bill_str = json_dumps_unicode(encrypt(bill_f, CRYPT_KEY, CRYPT_IV))
if not ship_to_str and not bill_str:
return ""
_update = f"update table_test set "
if ship_to_str and bill_str:
_update += f"ship_to = ship_to::jsonb || '{ship_to_str}'::jsonb, " \
f"bill = bill::jsonb || '{bill_str}'::jsonb "
elif ship_to_str:
_update += f"ship_to = ship_to::jsonb || '{ship_to_str}'::jsonb "
elif bill_str:
_update += f"bill = bill::jsonb || '{bill_str}'::jsonb "
_update += f"where table_test_id={table_test_id} "
return _update
def get_sql_update_decrypt(ship_to_ori: dict, bill_ori: dict, table_test_id) -> str:
keys = ("address1", "crypt_field")
ship_to = ship_to_ori or dict()
ship_to_f = {key: ship_to.get(key) for key in keys if ship_to.get(key)}
ship_to_str = json_dumps_unicode(decrypt(ship_to_f, CRYPT_KEY, CRYPT_IV))
bill = bill_ori or dict()
bill_f = {key: bill.get(key) for key in keys if bill.get(key)}
bill_str = json_dumps_unicode(decrypt(bill_f, CRYPT_KEY, CRYPT_IV))
if not ship_to_str and not bill_str:
return ""
_update = f"update table_test set "
if ship_to_str and bill_str:
_update += f"ship_to = ship_to::jsonb || '{ship_to_str}'::jsonb, " \
f"bill = bill::jsonb || '{bill_str}'::jsonb "
elif ship_to_str:
_update += f"ship_to = ship_to::jsonb || '{ship_to_str}'::jsonb "
elif bill_str:
_update += f"bill = bill::jsonb || '{bill_str}'::jsonb "
_update += f"where table_test_id={table_test_id} "
return _update
def run_codegen():
"""
pip install sqlacodegen
"""
args = 'sqlacodegen --outfile ' \
f'./test_model.py {pg_sql_str} --tables table_test'
os.system(args)
if __name__ == "__main__":
print("PG utils")
crypt_order()
- 优化 复制原表a到b, 更新新表b, 然后更新表名. 注意, 更新b表时的新增数据, 更新表名时, 数据库不可用.
CREATE TABLE table_bak AS TABLE table;
...加密并同步新数据到table_bak, table_bak无主键, 索引等
ALTER TABLE table RENAME TO table_new;
ALTER TABLE table_bak RENAME TO table;
评论列表,共 0 条评论
暂无评论