इस विशेष मामले में डीबी-एपीआई स्तर तक गिरना बेहतर है, क्योंकि आपको कुछ ऐसे टूल की आवश्यकता है जो सीधे SQLAlchemy कोर द्वारा भी उजागर नहीं किए जाते हैं, जैसे कि copy_expert()
. यह raw_connection()
. यदि आपका स्रोत डेटा एक CSV फ़ाइल है, तो आपको इस मामले में पांडा की बिल्कुल भी आवश्यकता नहीं है। एक अस्थायी स्टेजिंग तालिका बनाकर प्रारंभ करें, डेटा को अस्थायी तालिका में कॉपी करें, और संघर्ष प्रबंधन के साथ गंतव्य तालिका में सम्मिलित करें:
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
ON COMMIT DROP""")
with open("your_source.csv") as data:
cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
FROM STDIN WITH CSV""", data)
cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
SELECT itemid, title, street, pincode
FROM TEST_STAGING
ON CONFLICT ( itemid )
DO UPDATE SET title = EXCLUDED.title
, street = EXCLUDED.street
, pincode = EXCLUDED.pincode""")
except:
conn.rollback()
raise
else:
conn.commit()
finally:
conn.close()
यदि दूसरी ओर आपका स्रोत डेटा DataFrame
है , आप अभी भी COPY
. का उपयोग कर सकते हैं फ़ंक्शन को method=
करने के लिए to_sql()
. फ़ंक्शन उपरोक्त सभी तर्कों को छुपा भी सकता है:
import csv
from io import StringIO
from psycopg2 import sql
def psql_upsert_copy(table, conn, keys, data_iter):
dbapi_conn = conn.connection
buf = StringIO()
writer = csv.writer(buf)
writer.writerows(data_iter)
buf.seek(0)
if table.schema:
table_name = sql.SQL("{}.{}").format(
sql.Identifier(table.schema), sql.Identifier(table.name))
else:
table_name = sql.Identifier(table.name)
tmp_table_name = sql.Identifier(table.name + "_staging")
columns = sql.SQL(", ").join(map(sql.Identifier, keys))
with dbapi_conn.cursor() as cur:
# Create the staging table
stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
cur.execute(stmt)
# Populate the staging table
stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
stmt = sql.SQL(stmt).format(tmp_table_name, columns)
cur.copy_expert(stmt, buf)
# Upsert from the staging table to the destination. First find
# out what the primary key columns are.
stmt = """
SELECT kcu.column_name
FROM information_schema.table_constraints tco
JOIN information_schema.key_column_usage kcu
ON kcu.constraint_name = tco.constraint_name
AND kcu.constraint_schema = tco.constraint_schema
WHERE tco.constraint_type = 'PRIMARY KEY'
AND tco.table_name = %s
"""
args = (table.name,)
if table.schema:
stmt += "AND tco.table_schema = %s"
args += (table.schema,)
cur.execute(stmt, args)
pk_columns = {row[0] for row in cur.fetchall()}
# Separate "data" columns from (primary) key columns
data_columns = [k for k in keys if k not in pk_columns]
# Build conflict_target
pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))
set_ = sql.SQL(", ").join([
sql.SQL("{} = EXCLUDED.{}").format(k, k)
for k in map(sql.Identifier, data_columns)])
stmt = """
INSERT INTO {} ( {} )
SELECT {}
FROM {}
ON CONFLICT ( {} )
DO UPDATE SET {}
"""
stmt = sql.SQL(stmt).format(
table_name, columns, columns, tmp_table_name, pk_columns, set_)
cur.execute(stmt)
फिर आप नया DataFrame
डालेंगे
df.to_sql("test_table", engine,
method=psql_upsert_copy,
index=False,
if_exists="append")
इस पद्धति का उपयोग करते हुए ~1,000,000 पंक्तियों को स्थानीय डेटाबेस के साथ इस मशीन पर लगभग 16 सेकंड का समय लगा।