Add Meta Cartel DAO member import script
This commit is contained in:
parent
cd4ffa3665
commit
e7f81d3219
|
|
@ -0,0 +1,340 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Import Meta Cartel Members from CSV
|
||||
|
||||
This script imports Meta Cartel DAO members from a CSV file exported from DAOhaus.
|
||||
It adds the members to the database with proper DAO membership records and notes,
|
||||
while ensuring no duplicates are created.
|
||||
|
||||
Usage:
|
||||
python import_metacartel_csv.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import csv
|
||||
import logging
|
||||
from typing import Dict, Any, List, Optional
|
||||
from datetime import datetime
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Add parent directory to path to import utils
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from utils.db_connector import DatabaseConnector
|
||||
from utils.logger import setup_logger
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Setup logging
|
||||
logger = setup_logger("metacartel_importer")
|
||||
|
||||
class MetaCartelImporter:
|
||||
"""Importer for Meta Cartel members from CSV file"""
|
||||
|
||||
def __init__(self, csv_path: str):
|
||||
"""Initialize the importer"""
|
||||
self.csv_path = csv_path
|
||||
|
||||
# Initialize database
|
||||
self.db = DatabaseConnector()
|
||||
|
||||
# Register data source
|
||||
self.data_source_id = self.register_data_source()
|
||||
|
||||
def register_data_source(self) -> str:
|
||||
"""Register the data source in the database"""
|
||||
query = """
|
||||
INSERT INTO "DataSource" (
|
||||
id, name, type, description, "createdAt", "updatedAt"
|
||||
)
|
||||
VALUES (
|
||||
gen_random_uuid(), %(name)s, %(type)s, %(description)s, NOW(), NOW()
|
||||
)
|
||||
ON CONFLICT (name) DO UPDATE
|
||||
SET type = EXCLUDED.type,
|
||||
description = EXCLUDED.description,
|
||||
"updatedAt" = NOW()
|
||||
RETURNING id
|
||||
"""
|
||||
|
||||
result = self.db.execute_query(query, {
|
||||
"name": "Meta Cartel DAO CSV",
|
||||
"description": "Meta Cartel is a Moloch DAO on Ethereum Mainnet. Imported from CSV export.",
|
||||
"type": "blockchain"
|
||||
})
|
||||
|
||||
data_source_id = result[0]["id"]
|
||||
logger.info(f"Registered data source with ID: {data_source_id}")
|
||||
return data_source_id
|
||||
|
||||
def read_csv(self) -> List[Dict[str, Any]]:
|
||||
"""Read the CSV file and return a list of members"""
|
||||
members = []
|
||||
|
||||
try:
|
||||
with open(self.csv_path, 'r') as csvfile:
|
||||
reader = csv.DictReader(csvfile)
|
||||
for row in reader:
|
||||
# Only include members that exist and haven't ragequit
|
||||
if row.get('exists', '').lower() == 'true' and row.get('didRagequit', '').lower() == 'false':
|
||||
members.append({
|
||||
"address": row.get('memberAddress', '').lower(),
|
||||
"delegateKey": row.get('delegateKey', '').lower(),
|
||||
"shares": int(row.get('shares', 0)),
|
||||
"loot": int(row.get('loot', 0)),
|
||||
"joined_at": row.get('createdAt', None)
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading CSV file: {e}")
|
||||
raise
|
||||
|
||||
logger.info(f"Read {len(members)} members from CSV file")
|
||||
return members
|
||||
|
||||
def get_ens_name_for_address(self, address: str) -> Optional[str]:
|
||||
"""Check if we already have an ENS name for this address in the database"""
|
||||
query = 'SELECT "ensName" FROM "Contact" WHERE "ethereumAddress" = %(address)s AND "ensName" IS NOT NULL'
|
||||
result = self.db.execute_query(query, {"address": address})
|
||||
|
||||
if result and result[0]["ensName"]:
|
||||
return result[0]["ensName"]
|
||||
return None
|
||||
|
||||
def check_existing_dao_membership(self, contact_id: str, dao_name: str) -> Optional[Dict[str, Any]]:
|
||||
"""Check if a contact already has a membership in the specified DAO"""
|
||||
query = """
|
||||
SELECT id, "daoType", "joinedAt"
|
||||
FROM "DaoMembership"
|
||||
WHERE "contactId" = %(contact_id)s AND "daoName" = %(dao_name)s
|
||||
"""
|
||||
|
||||
result = self.db.execute_query(query, {
|
||||
"contact_id": contact_id,
|
||||
"dao_name": dao_name
|
||||
})
|
||||
|
||||
if result:
|
||||
return result[0]
|
||||
return None
|
||||
|
||||
def generate_name_for_contact(self, address: str, ens_name: Optional[str] = None) -> str:
|
||||
"""Generate a better name for the contact based on ENS or address"""
|
||||
if ens_name:
|
||||
# Use ENS name without .eth suffix as the name
|
||||
if ens_name.endswith('.eth'):
|
||||
return ens_name[:-4] # Remove .eth suffix
|
||||
return ens_name
|
||||
|
||||
# Use shortened address if no ENS name
|
||||
return f"MC_{address[:6]}...{address[-4:]}"
|
||||
|
||||
def process_member(self, member: Dict[str, Any]) -> Optional[str]:
|
||||
"""Process a member and add to the database"""
|
||||
address = member["address"]
|
||||
|
||||
# Check if contact already exists and get ENS name if available
|
||||
query = 'SELECT id, "ensName", name FROM "Contact" WHERE "ethereumAddress" = %(address)s'
|
||||
result = self.db.execute_query(query, {"address": address})
|
||||
|
||||
contact_id = None
|
||||
ens_name = None
|
||||
|
||||
if result:
|
||||
# Contact exists, update if needed
|
||||
contact_id = result[0]["id"]
|
||||
ens_name = result[0].get("ensName")
|
||||
current_name = result[0].get("name")
|
||||
|
||||
logger.info(f"Contact already exists for {address} with ID {contact_id}")
|
||||
|
||||
# Update the contact if needed (e.g., if name is generic or missing ENS)
|
||||
if (current_name and current_name.startswith("ETH_")) or (not ens_name and self.get_ens_name_for_address(address)):
|
||||
# Get ENS name if we don't have one
|
||||
if not ens_name:
|
||||
ens_name = self.get_ens_name_for_address(address)
|
||||
|
||||
# Generate a better name
|
||||
name = self.generate_name_for_contact(address, ens_name)
|
||||
|
||||
# Update the contact
|
||||
update_query = """
|
||||
UPDATE "Contact"
|
||||
SET "ensName" = COALESCE(%(ens_name)s, "ensName"),
|
||||
name = %(name)s,
|
||||
"updatedAt" = NOW()
|
||||
WHERE id = %(contact_id)s
|
||||
"""
|
||||
|
||||
self.db.execute_update(update_query, {
|
||||
"contact_id": contact_id,
|
||||
"ens_name": ens_name,
|
||||
"name": name
|
||||
})
|
||||
|
||||
logger.info(f"Updated contact {contact_id} with name '{name}' and ENS '{ens_name}'")
|
||||
else:
|
||||
# Create new contact
|
||||
# Check if we have an ENS name for this address
|
||||
ens_name = self.get_ens_name_for_address(address)
|
||||
|
||||
# Generate a better name for the contact
|
||||
name = self.generate_name_for_contact(address, ens_name)
|
||||
|
||||
# Create new contact
|
||||
query = """
|
||||
INSERT INTO "Contact" (
|
||||
id, "ethereumAddress", "ensName", name, "createdAt", "updatedAt"
|
||||
)
|
||||
VALUES (
|
||||
gen_random_uuid(), %(address)s, %(ens_name)s, %(name)s, NOW(), NOW()
|
||||
)
|
||||
RETURNING id
|
||||
"""
|
||||
|
||||
result = self.db.execute_query(query, {
|
||||
"address": address,
|
||||
"ens_name": ens_name,
|
||||
"name": name
|
||||
})
|
||||
|
||||
if not result:
|
||||
logger.error(f"Failed to add contact for {address}")
|
||||
return None
|
||||
|
||||
contact_id = result[0]["id"]
|
||||
logger.info(f"Added new contact: {address} with ID {contact_id}")
|
||||
|
||||
# Check if DAO membership already exists
|
||||
existing_membership = self.check_existing_dao_membership(contact_id, "Meta Cartel")
|
||||
|
||||
# Parse joined_at timestamp
|
||||
joined_at = None
|
||||
if member.get("joined_at"):
|
||||
try:
|
||||
# Convert Unix timestamp to datetime
|
||||
joined_at_timestamp = int(member["joined_at"])
|
||||
joined_at = datetime.fromtimestamp(joined_at_timestamp)
|
||||
except (ValueError, TypeError):
|
||||
joined_at = None
|
||||
|
||||
if existing_membership:
|
||||
# Only update if we have better information
|
||||
if joined_at and (not existing_membership.get("joinedAt") or existing_membership.get("joinedAt") > joined_at):
|
||||
# Update with better join date
|
||||
update_query = """
|
||||
UPDATE "DaoMembership"
|
||||
SET "joinedAt" = %(joined_at)s,
|
||||
"updatedAt" = NOW()
|
||||
WHERE id = %(membership_id)s
|
||||
"""
|
||||
|
||||
self.db.execute_update(update_query, {
|
||||
"membership_id": existing_membership["id"],
|
||||
"joined_at": joined_at
|
||||
})
|
||||
|
||||
logger.info(f"Updated existing DAO membership for contact {contact_id} with better join date")
|
||||
else:
|
||||
# Add new DAO membership
|
||||
query = """
|
||||
INSERT INTO "DaoMembership" (
|
||||
id, "contactId", "daoName", "daoType", "joinedAt", "createdAt", "updatedAt"
|
||||
)
|
||||
VALUES (
|
||||
gen_random_uuid(), %(contact_id)s, %(dao_name)s, %(dao_type)s,
|
||||
%(joined_at)s, NOW(), NOW()
|
||||
)
|
||||
"""
|
||||
|
||||
self.db.execute_update(query, {
|
||||
"contact_id": contact_id,
|
||||
"dao_name": "Meta Cartel",
|
||||
"dao_type": "Moloch DAO",
|
||||
"joined_at": joined_at
|
||||
})
|
||||
|
||||
logger.info(f"Added new DAO membership for contact {contact_id}")
|
||||
|
||||
# Check if a similar note already exists
|
||||
note_content = f"Member of Meta Cartel DAO with {member['shares']} shares and {member['loot']} loot"
|
||||
check_note_query = """
|
||||
SELECT id FROM "Note"
|
||||
WHERE "contactId" = %(contact_id)s
|
||||
AND content LIKE %(content_pattern)s
|
||||
"""
|
||||
|
||||
existing_note = self.db.execute_query(check_note_query, {
|
||||
"contact_id": contact_id,
|
||||
"content_pattern": "%Member of Meta Cartel DAO%shares%loot%"
|
||||
})
|
||||
|
||||
if existing_note:
|
||||
# Update the existing note
|
||||
update_note_query = """
|
||||
UPDATE "Note"
|
||||
SET content = %(content)s,
|
||||
"updatedAt" = NOW()
|
||||
WHERE id = %(note_id)s
|
||||
"""
|
||||
|
||||
self.db.execute_update(update_note_query, {
|
||||
"note_id": existing_note[0]["id"],
|
||||
"content": note_content
|
||||
})
|
||||
|
||||
logger.info(f"Updated existing note for contact {contact_id}")
|
||||
else:
|
||||
# Add a note about the member's shares and loot
|
||||
query = """
|
||||
INSERT INTO "Note" (
|
||||
id, "contactId", content, "createdAt", "updatedAt"
|
||||
)
|
||||
VALUES (
|
||||
gen_random_uuid(), %(contact_id)s, %(content)s, NOW(), NOW()
|
||||
)
|
||||
"""
|
||||
|
||||
self.db.execute_update(query, {
|
||||
"contact_id": contact_id,
|
||||
"content": note_content
|
||||
})
|
||||
|
||||
logger.info(f"Added new note for contact {contact_id}")
|
||||
|
||||
return contact_id
|
||||
|
||||
def run(self):
|
||||
"""Run the importer"""
|
||||
logger.info(f"Starting Meta Cartel member import from {self.csv_path}")
|
||||
|
||||
# Read members from CSV
|
||||
members = self.read_csv()
|
||||
|
||||
# Process members
|
||||
processed_count = 0
|
||||
for member in members:
|
||||
if self.process_member(member):
|
||||
processed_count += 1
|
||||
|
||||
logger.info(f"Processed {processed_count} members out of {len(members)} found")
|
||||
return processed_count
|
||||
|
||||
def main():
|
||||
"""Main function"""
|
||||
try:
|
||||
csv_path = os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)),
|
||||
"metacartel-v2-hd_Members_1742164186.csv"
|
||||
)
|
||||
|
||||
importer = MetaCartelImporter(csv_path)
|
||||
processed_count = importer.run()
|
||||
logger.info(f"Import completed successfully. Processed {processed_count} members.")
|
||||
return 0
|
||||
except Exception as e:
|
||||
logger.exception(f"Error running importer: {e}")
|
||||
return 1
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Loading…
Reference in New Issue