stones/scripts/moloch_dao/merge_duplicate_contacts.py

318 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Merge Duplicate Contacts
This script finds and merges duplicate contacts in the database based on Ethereum addresses.
It keeps the most complete record and updates all relationships to point to the primary contact.
Usage:
python merge_duplicate_contacts.py
"""
import os
import sys
import logging
from typing import Dict, Any, List, Optional, Tuple
from dotenv import load_dotenv
# Add parent directory to path to import utils
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.db_connector import DatabaseConnector
from utils.logger import setup_logger
# Load environment variables
load_dotenv()
# Setup logging
logger = setup_logger("duplicate_contact_merger")
class DuplicateContactMerger:
"""Merger for duplicate contacts"""
def __init__(self):
"""Initialize the merger"""
# Initialize database
self.db = DatabaseConnector()
def find_duplicate_contacts(self) -> List[Dict[str, Any]]:
"""Find duplicate contacts based on Ethereum address"""
query = """
SELECT "ethereumAddress", COUNT(*) as count, array_agg(id) as contact_ids
FROM "Contact"
WHERE "ethereumAddress" IS NOT NULL
GROUP BY "ethereumAddress"
HAVING COUNT(*) > 1
ORDER BY COUNT(*) DESC
"""
return self.db.execute_query(query)
def get_contact_details(self, contact_id: str) -> Dict[str, Any]:
"""Get details for a specific contact"""
query = """
SELECT id, "ethereumAddress", "ensName", name, "createdAt", "updatedAt"
FROM "Contact"
WHERE id = %(contact_id)s
"""
result = self.db.execute_query(query, {"contact_id": contact_id})
if result:
return result[0]
return {}
def determine_primary_contact(self, contact_ids: List[str]) -> Tuple[str, List[str]]:
"""
Determine which contact should be the primary one to keep.
Strategy:
1. Prefer contacts with ENS names
2. Prefer contacts with non-generic names
3. Prefer older contacts (earlier creation date)
Returns:
Tuple of (primary_contact_id, secondary_contact_ids)
"""
contacts = [self.get_contact_details(cid) for cid in contact_ids]
# Sort contacts by our preference criteria
sorted_contacts = sorted(
contacts,
key=lambda c: (
# Prefer contacts with ENS names (None sorts last)
c.get("ensName") is None,
# Prefer contacts with non-generic names
c.get("name", "").startswith("RG_"),
# Prefer older contacts
c.get("createdAt")
)
)
primary = sorted_contacts[0]["id"]
secondaries = [c["id"] for c in sorted_contacts[1:]]
return primary, secondaries
def merge_contacts(self, primary_id: str, secondary_ids: List[str]) -> bool:
"""
Merge secondary contacts into the primary contact.
Steps:
1. Update all relationships to point to the primary contact
2. Merge any missing data from secondaries into primary
3. Delete the secondary contacts
Returns:
True if successful, False otherwise
"""
try:
# Start a transaction
self.db.execute_query("BEGIN")
# Get primary contact details
primary = self.get_contact_details(primary_id)
logger.info(f"Merging contacts into primary: {primary_id} ({primary.get('name')})")
# Get existing DAO memberships for the primary contact
primary_daos_query = """
SELECT "daoName" FROM "DaoMembership" WHERE "contactId" = %(contact_id)s
"""
primary_daos_result = self.db.execute_query(primary_daos_query, {"contact_id": primary_id})
primary_daos = [row["daoName"] for row in primary_daos_result] if primary_daos_result else []
logger.info(f"Primary contact is already a member of: {', '.join(primary_daos) if primary_daos else 'No DAOs'}")
# For each secondary contact, transfer its DAO memberships to the primary
for secondary_id in secondary_ids:
# Get DAO memberships for this secondary contact
secondary_daos_query = """
SELECT id, "daoName", "daoType", "joinedAt"
FROM "DaoMembership"
WHERE "contactId" = %(contact_id)s
"""
secondary_daos = self.db.execute_query(secondary_daos_query, {"contact_id": secondary_id})
if not secondary_daos:
logger.info(f"Secondary contact {secondary_id} has no DAO memberships")
continue
logger.info(f"Secondary contact {secondary_id} is a member of: {', '.join([dao['daoName'] for dao in secondary_daos])}")
# For each DAO membership of the secondary contact
for dao in secondary_daos:
dao_name = dao["daoName"]
if dao_name in primary_daos:
# Primary already has this membership, check if we need to update join date
primary_dao_query = """
SELECT id, "joinedAt" FROM "DaoMembership"
WHERE "contactId" = %(contact_id)s AND "daoName" = %(dao_name)s
"""
primary_dao = self.db.execute_query(primary_dao_query, {
"contact_id": primary_id,
"dao_name": dao_name
})[0]
# If secondary has an earlier join date, update the primary's join date
if dao["joinedAt"] and (not primary_dao["joinedAt"] or dao["joinedAt"] < primary_dao["joinedAt"]):
update_query = """
UPDATE "DaoMembership"
SET "joinedAt" = %(joined_at)s,
"updatedAt" = NOW()
WHERE id = %(membership_id)s
"""
self.db.execute_update(update_query, {
"membership_id": primary_dao["id"],
"joined_at": dao["joinedAt"]
})
logger.info(f"Updated join date for {dao_name} membership of primary contact")
else:
# Primary doesn't have this membership, transfer it
update_query = """
UPDATE "DaoMembership"
SET "contactId" = %(primary_id)s,
"updatedAt" = NOW()
WHERE id = %(membership_id)s
"""
self.db.execute_update(update_query, {
"primary_id": primary_id,
"membership_id": dao["id"]
})
# Add to primary's DAO list to avoid duplicates in future iterations
primary_daos.append(dao_name)
logger.info(f"Transferred {dao_name} membership from secondary to primary contact")
# Update notes
self.db.execute_update("""
UPDATE "Note"
SET "contactId" = %(primary_id)s,
"updatedAt" = NOW()
WHERE "contactId" IN %(secondary_ids)s
""", {
"primary_id": primary_id,
"secondary_ids": tuple(secondary_ids)
})
logger.info(f"Transferred notes from secondary contacts to primary")
# Update ENS name if primary doesn't have one but a secondary does
if not primary.get("ensName"):
ens_query = """
SELECT "ensName" FROM "Contact"
WHERE id IN %(secondary_ids)s
AND "ensName" IS NOT NULL
LIMIT 1
"""
ens_result = self.db.execute_query(ens_query, {"secondary_ids": tuple(secondary_ids)})
if ens_result and ens_result[0]["ensName"]:
self.db.execute_update("""
UPDATE "Contact"
SET "ensName" = %(ens_name)s,
"updatedAt" = NOW()
WHERE id = %(primary_id)s
""", {
"primary_id": primary_id,
"ens_name": ens_result[0]["ensName"]
})
logger.info(f"Updated primary contact with ENS name: {ens_result[0]['ensName']}")
# Transfer any other social media info that primary might be missing
for field in ["twitter", "discord", "telegram", "email", "farcaster", "otherSocial"]:
social_query = f"""
SELECT "{field}" FROM "Contact"
WHERE id IN %(secondary_ids)s
AND "{field}" IS NOT NULL
AND "{field}" != ''
LIMIT 1
"""
social_result = self.db.execute_query(social_query, {"secondary_ids": tuple(secondary_ids)})
if social_result and social_result[0][field]:
# Check if primary has this field
primary_social_query = f"""
SELECT "{field}" FROM "Contact"
WHERE id = %(primary_id)s
"""
primary_social = self.db.execute_query(primary_social_query, {"primary_id": primary_id})[0]
# Only update if primary doesn't have this field
if not primary_social[field]:
update_query = f"""
UPDATE "Contact"
SET "{field}" = %(value)s,
"updatedAt" = NOW()
WHERE id = %(primary_id)s
"""
self.db.execute_update(update_query, {
"primary_id": primary_id,
"value": social_result[0][field]
})
logger.info(f"Updated primary contact with {field}: {social_result[0][field]}")
# Delete secondary contacts
self.db.execute_update("""
DELETE FROM "Contact"
WHERE id IN %(secondary_ids)s
""", {
"secondary_ids": tuple(secondary_ids)
})
# Commit the transaction
self.db.execute_query("COMMIT")
logger.info(f"Successfully merged {len(secondary_ids)} contacts into {primary_id}")
return True
except Exception as e:
# Rollback on error
self.db.execute_query("ROLLBACK")
logger.error(f"Error merging contacts: {e}")
return False
def run(self):
"""Run the merger"""
logger.info("Starting duplicate contact merger")
# Find duplicate contacts
duplicates = self.find_duplicate_contacts()
logger.info(f"Found {len(duplicates)} Ethereum addresses with duplicate contacts")
total_merged = 0
for dup in duplicates:
eth_address = dup["ethereumAddress"]
count = dup["count"]
contact_ids = dup["contact_ids"]
logger.info(f"Processing {count} duplicates for address {eth_address}")
# Determine primary and secondary contacts
primary_id, secondary_ids = self.determine_primary_contact(contact_ids)
# Merge the contacts
if self.merge_contacts(primary_id, secondary_ids):
total_merged += len(secondary_ids)
logger.info(f"Merged a total of {total_merged} duplicate contacts")
return total_merged
def main():
"""Main function"""
try:
merger = DuplicateContactMerger()
merged_count = merger.run()
logger.info(f"Duplicate contact merging completed successfully. Merged {merged_count} contacts.")
return 0
except Exception as e:
logger.exception(f"Error merging duplicate contacts: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())