From e7f81d3219ee60102cc220fca4a5acfb14174e6b Mon Sep 17 00:00:00 2001 From: boilerrat <128boilerrat@gmail.com> Date: Sun, 16 Mar 2025 19:29:06 -0400 Subject: [PATCH] Add Meta Cartel DAO member import script --- scripts/moloch_dao/import_metacartel_csv.py | 340 ++++++++++++++++++++ 1 file changed, 340 insertions(+) create mode 100644 scripts/moloch_dao/import_metacartel_csv.py diff --git a/scripts/moloch_dao/import_metacartel_csv.py b/scripts/moloch_dao/import_metacartel_csv.py new file mode 100644 index 0000000..6025e8a --- /dev/null +++ b/scripts/moloch_dao/import_metacartel_csv.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python3 +""" +Import Meta Cartel Members from CSV + +This script imports Meta Cartel DAO members from a CSV file exported from DAOhaus. +It adds the members to the database with proper DAO membership records and notes, +while ensuring no duplicates are created. + +Usage: + python import_metacartel_csv.py +""" + +import os +import sys +import csv +import logging +from typing import Dict, Any, List, Optional +from datetime import datetime +from dotenv import load_dotenv + +# Add parent directory to path to import utils +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from utils.db_connector import DatabaseConnector +from utils.logger import setup_logger + +# Load environment variables +load_dotenv() + +# Setup logging +logger = setup_logger("metacartel_importer") + +class MetaCartelImporter: + """Importer for Meta Cartel members from CSV file""" + + def __init__(self, csv_path: str): + """Initialize the importer""" + self.csv_path = csv_path + + # Initialize database + self.db = DatabaseConnector() + + # Register data source + self.data_source_id = self.register_data_source() + + def register_data_source(self) -> str: + """Register the data source in the database""" + query = """ + INSERT INTO "DataSource" ( + id, name, type, description, "createdAt", "updatedAt" + ) + VALUES ( + gen_random_uuid(), %(name)s, %(type)s, %(description)s, NOW(), NOW() + ) + ON CONFLICT (name) DO UPDATE + SET type = EXCLUDED.type, + description = EXCLUDED.description, + "updatedAt" = NOW() + RETURNING id + """ + + result = self.db.execute_query(query, { + "name": "Meta Cartel DAO CSV", + "description": "Meta Cartel is a Moloch DAO on Ethereum Mainnet. Imported from CSV export.", + "type": "blockchain" + }) + + data_source_id = result[0]["id"] + logger.info(f"Registered data source with ID: {data_source_id}") + return data_source_id + + def read_csv(self) -> List[Dict[str, Any]]: + """Read the CSV file and return a list of members""" + members = [] + + try: + with open(self.csv_path, 'r') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + # Only include members that exist and haven't ragequit + if row.get('exists', '').lower() == 'true' and row.get('didRagequit', '').lower() == 'false': + members.append({ + "address": row.get('memberAddress', '').lower(), + "delegateKey": row.get('delegateKey', '').lower(), + "shares": int(row.get('shares', 0)), + "loot": int(row.get('loot', 0)), + "joined_at": row.get('createdAt', None) + }) + except Exception as e: + logger.error(f"Error reading CSV file: {e}") + raise + + logger.info(f"Read {len(members)} members from CSV file") + return members + + def get_ens_name_for_address(self, address: str) -> Optional[str]: + """Check if we already have an ENS name for this address in the database""" + query = 'SELECT "ensName" FROM "Contact" WHERE "ethereumAddress" = %(address)s AND "ensName" IS NOT NULL' + result = self.db.execute_query(query, {"address": address}) + + if result and result[0]["ensName"]: + return result[0]["ensName"] + return None + + def check_existing_dao_membership(self, contact_id: str, dao_name: str) -> Optional[Dict[str, Any]]: + """Check if a contact already has a membership in the specified DAO""" + query = """ + SELECT id, "daoType", "joinedAt" + FROM "DaoMembership" + WHERE "contactId" = %(contact_id)s AND "daoName" = %(dao_name)s + """ + + result = self.db.execute_query(query, { + "contact_id": contact_id, + "dao_name": dao_name + }) + + if result: + return result[0] + return None + + def generate_name_for_contact(self, address: str, ens_name: Optional[str] = None) -> str: + """Generate a better name for the contact based on ENS or address""" + if ens_name: + # Use ENS name without .eth suffix as the name + if ens_name.endswith('.eth'): + return ens_name[:-4] # Remove .eth suffix + return ens_name + + # Use shortened address if no ENS name + return f"MC_{address[:6]}...{address[-4:]}" + + def process_member(self, member: Dict[str, Any]) -> Optional[str]: + """Process a member and add to the database""" + address = member["address"] + + # Check if contact already exists and get ENS name if available + query = 'SELECT id, "ensName", name FROM "Contact" WHERE "ethereumAddress" = %(address)s' + result = self.db.execute_query(query, {"address": address}) + + contact_id = None + ens_name = None + + if result: + # Contact exists, update if needed + contact_id = result[0]["id"] + ens_name = result[0].get("ensName") + current_name = result[0].get("name") + + logger.info(f"Contact already exists for {address} with ID {contact_id}") + + # Update the contact if needed (e.g., if name is generic or missing ENS) + if (current_name and current_name.startswith("ETH_")) or (not ens_name and self.get_ens_name_for_address(address)): + # Get ENS name if we don't have one + if not ens_name: + ens_name = self.get_ens_name_for_address(address) + + # Generate a better name + name = self.generate_name_for_contact(address, ens_name) + + # Update the contact + update_query = """ + UPDATE "Contact" + SET "ensName" = COALESCE(%(ens_name)s, "ensName"), + name = %(name)s, + "updatedAt" = NOW() + WHERE id = %(contact_id)s + """ + + self.db.execute_update(update_query, { + "contact_id": contact_id, + "ens_name": ens_name, + "name": name + }) + + logger.info(f"Updated contact {contact_id} with name '{name}' and ENS '{ens_name}'") + else: + # Create new contact + # Check if we have an ENS name for this address + ens_name = self.get_ens_name_for_address(address) + + # Generate a better name for the contact + name = self.generate_name_for_contact(address, ens_name) + + # Create new contact + query = """ + INSERT INTO "Contact" ( + id, "ethereumAddress", "ensName", name, "createdAt", "updatedAt" + ) + VALUES ( + gen_random_uuid(), %(address)s, %(ens_name)s, %(name)s, NOW(), NOW() + ) + RETURNING id + """ + + result = self.db.execute_query(query, { + "address": address, + "ens_name": ens_name, + "name": name + }) + + if not result: + logger.error(f"Failed to add contact for {address}") + return None + + contact_id = result[0]["id"] + logger.info(f"Added new contact: {address} with ID {contact_id}") + + # Check if DAO membership already exists + existing_membership = self.check_existing_dao_membership(contact_id, "Meta Cartel") + + # Parse joined_at timestamp + joined_at = None + if member.get("joined_at"): + try: + # Convert Unix timestamp to datetime + joined_at_timestamp = int(member["joined_at"]) + joined_at = datetime.fromtimestamp(joined_at_timestamp) + except (ValueError, TypeError): + joined_at = None + + if existing_membership: + # Only update if we have better information + if joined_at and (not existing_membership.get("joinedAt") or existing_membership.get("joinedAt") > joined_at): + # Update with better join date + update_query = """ + UPDATE "DaoMembership" + SET "joinedAt" = %(joined_at)s, + "updatedAt" = NOW() + WHERE id = %(membership_id)s + """ + + self.db.execute_update(update_query, { + "membership_id": existing_membership["id"], + "joined_at": joined_at + }) + + logger.info(f"Updated existing DAO membership for contact {contact_id} with better join date") + else: + # Add new DAO membership + query = """ + INSERT INTO "DaoMembership" ( + id, "contactId", "daoName", "daoType", "joinedAt", "createdAt", "updatedAt" + ) + VALUES ( + gen_random_uuid(), %(contact_id)s, %(dao_name)s, %(dao_type)s, + %(joined_at)s, NOW(), NOW() + ) + """ + + self.db.execute_update(query, { + "contact_id": contact_id, + "dao_name": "Meta Cartel", + "dao_type": "Moloch DAO", + "joined_at": joined_at + }) + + logger.info(f"Added new DAO membership for contact {contact_id}") + + # Check if a similar note already exists + note_content = f"Member of Meta Cartel DAO with {member['shares']} shares and {member['loot']} loot" + check_note_query = """ + SELECT id FROM "Note" + WHERE "contactId" = %(contact_id)s + AND content LIKE %(content_pattern)s + """ + + existing_note = self.db.execute_query(check_note_query, { + "contact_id": contact_id, + "content_pattern": "%Member of Meta Cartel DAO%shares%loot%" + }) + + if existing_note: + # Update the existing note + update_note_query = """ + UPDATE "Note" + SET content = %(content)s, + "updatedAt" = NOW() + WHERE id = %(note_id)s + """ + + self.db.execute_update(update_note_query, { + "note_id": existing_note[0]["id"], + "content": note_content + }) + + logger.info(f"Updated existing note for contact {contact_id}") + else: + # Add a note about the member's shares and loot + query = """ + INSERT INTO "Note" ( + id, "contactId", content, "createdAt", "updatedAt" + ) + VALUES ( + gen_random_uuid(), %(contact_id)s, %(content)s, NOW(), NOW() + ) + """ + + self.db.execute_update(query, { + "contact_id": contact_id, + "content": note_content + }) + + logger.info(f"Added new note for contact {contact_id}") + + return contact_id + + def run(self): + """Run the importer""" + logger.info(f"Starting Meta Cartel member import from {self.csv_path}") + + # Read members from CSV + members = self.read_csv() + + # Process members + processed_count = 0 + for member in members: + if self.process_member(member): + processed_count += 1 + + logger.info(f"Processed {processed_count} members out of {len(members)} found") + return processed_count + +def main(): + """Main function""" + try: + csv_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "metacartel-v2-hd_Members_1742164186.csv" + ) + + importer = MetaCartelImporter(csv_path) + processed_count = importer.run() + logger.info(f"Import completed successfully. Processed {processed_count} members.") + return 0 + except Exception as e: + logger.exception(f"Error running importer: {e}") + return 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file