#!/usr/bin/env python3 """ Import Public Haus Members from Optimism Blockchain using Events This script fetches members of Public Haus DAO by querying events from the Optimism blockchain, imports them into the database, and links them to the Public Haus DAO. Usage: python import_public_haus_members_events.py """ import os import sys import logging import json import time from typing import Dict, Any, List, Optional from web3 import Web3 from dotenv import load_dotenv # Add parent directory to path to import utils sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.db_connector import DatabaseConnector from utils.logger import setup_logger # Load environment variables load_dotenv() # Setup logging logger = setup_logger("public_haus_importer") # Constants PUBLIC_HAUS_DAO_ID = "0xf5d6b637a9185707f52d40d452956ca49018247a" # Public Haus DAO ID on Optimism # Moloch DAO V3 ABI (partial, only what we need for events) MOLOCH_V3_ABI = [ { "anonymous": False, "inputs": [ {"indexed": True, "internalType": "address", "name": "member", "type": "address"}, {"indexed": False, "internalType": "uint256", "name": "shares", "type": "uint256"} ], "name": "SharingEvent", "type": "event" }, { "anonymous": False, "inputs": [ {"indexed": True, "internalType": "address", "name": "applicant", "type": "address"} ], "name": "MembershipProposalSubmitted", "type": "event" }, { "anonymous": False, "inputs": [ {"indexed": True, "internalType": "address", "name": "member", "type": "address"} ], "name": "MemberAdded", "type": "event" }, { "anonymous": False, "inputs": [ {"indexed": True, "internalType": "address", "name": "memberAddress", "type": "address"}, {"indexed": False, "internalType": "uint256", "name": "shares", "type": "uint256"}, {"indexed": False, "internalType": "uint256", "name": "loot", "type": "uint256"} ], "name": "ProcessProposal", "type": "event" } ] class PublicHausImporter: """Importer for Public Haus members from Optimism blockchain using events""" def __init__(self): """Initialize the importer""" # Initialize database self.db = DatabaseConnector() # Initialize Web3 optimism_rpc_url = os.getenv("OPTIMISM_RPC_URL") if not optimism_rpc_url: raise ValueError("OPTIMISM_RPC_URL environment variable not set") self.web3 = Web3(Web3.HTTPProvider(optimism_rpc_url)) if not self.web3.is_connected(): raise ValueError("Failed to connect to Optimism RPC") logger.info(f"Connected to Optimism: {self.web3.is_connected()}") # Initialize contract self.contract = self.web3.eth.contract( address=self.web3.to_checksum_address(PUBLIC_HAUS_DAO_ID), abi=MOLOCH_V3_ABI ) # Register data source self.data_source_id = self.register_data_source() # Initialize scraping job self.job_id = self.db.create_scraping_job( source_name="Public Haus DAO Blockchain Events", status="running" ) logger.info(f"Created scraping job with ID: {self.job_id}") def register_data_source(self) -> str: """Register the Public Haus data source in the database""" return self.db.upsert_data_source( name="Public Haus DAO Blockchain", source_type="blockchain", description="Public Haus DAO members from Optimism blockchain" ) def fetch_members_from_events(self) -> List[Dict[str, Any]]: """ Fetch Public Haus members by querying events Returns: List of member information """ try: # Get the latest block number latest_block = self.web3.eth.block_number # Calculate the starting block (approximately 6 months ago) # Optimism has ~1 block every 2 seconds blocks_per_day = 43200 # 86400 seconds / 2 seconds per block start_block = max(0, latest_block - (blocks_per_day * 180)) # 180 days logger.info(f"Fetching events from block {start_block} to {latest_block}") # Get all member-related events member_addresses = set() # Try different event types that might indicate membership for event_name in ["MemberAdded", "ProcessProposal", "SharingEvent", "MembershipProposalSubmitted"]: try: event_filter = self.contract.events[event_name].create_filter( fromBlock=start_block, toBlock=latest_block ) events = event_filter.get_all_entries() logger.info(f"Found {len(events)} {event_name} events") for event in events: if hasattr(event.args, 'member'): member_addresses.add(event.args.member) elif hasattr(event.args, 'memberAddress'): member_addresses.add(event.args.memberAddress) elif hasattr(event.args, 'applicant'): member_addresses.add(event.args.applicant) except Exception as e: logger.warning(f"Error fetching {event_name} events: {e}") continue # If we didn't find any members through events, try a different approach if not member_addresses: logger.warning("No members found through events, trying alternative approach") # Try to get members by checking recent transactions to the DAO transactions = [] for block_num in range(latest_block - 1000, latest_block): block = self.web3.eth.get_block(block_num, full_transactions=True) for tx in block.transactions: if tx.to and tx.to.lower() == PUBLIC_HAUS_DAO_ID.lower(): transactions.append(tx) member_addresses.add(tx['from']) # Convert addresses to member objects members = [] for address in member_addresses: members.append({ "address": address, "dao": "Public Haus", "shares": 0, # We don't have share information from events "loot": 0 # We don't have loot information from events }) logger.info(f"Found {len(members)} unique members") return members except Exception as e: logger.error(f"Error fetching members from events: {e}") raise def process_member(self, member: Dict[str, Any]) -> Optional[str]: """ Process a member and import into the database Args: member: Member information Returns: Contact ID if successful, None otherwise """ try: # Extract member information address = member["address"] dao_name = member["dao"] # Check if contact exists contact_id = self.db.get_contact_by_ethereum_address(address) if contact_id: logger.info(f"Contact already exists for address {address}") else: # Create new contact contact_id = self.db.create_contact( name=f"Public Haus Member {address[:8]}", ethereum_address=address, email=None, twitter=None, github=None, telegram=None, discord=None ) logger.info(f"Created new contact with ID {contact_id} for address {address}") # Link contact to data source self.db.link_contact_to_data_source( contact_id=contact_id, data_source_id=self.data_source_id, external_id=address ) # Add tag for the DAO self.db.add_tag_to_contact( contact_id=contact_id, tag_name=dao_name ) # Add note about membership self.db.add_note_to_contact( contact_id=contact_id, note=f"Member of {dao_name} DAO on Optimism" ) return contact_id except Exception as e: logger.error(f"Error processing member {member['address']}: {e}") return None def run(self) -> int: """ Run the importer Returns: Number of imported members """ try: # Fetch members members = self.fetch_members_from_events() # Process members imported_count = 0 for member in members: contact_id = self.process_member(member) if contact_id: imported_count += 1 # Sleep to avoid rate limiting time.sleep(0.1) # Update scraping job self.db.update_scraping_job( job_id=self.job_id, status="completed", records_processed=len(members), records_added=imported_count, records_updated=0 ) logger.info(f"Imported {imported_count} members out of {len(members)}") return imported_count except Exception as e: logger.error(f"Error importing members: {e}") # Update scraping job with error self.db.update_scraping_job( job_id=self.job_id, status="failed", error_message=str(e) ) raise def main(): """Main entry point""" try: importer = PublicHausImporter() imported_count = importer.run() logger.info(f"Successfully imported {imported_count} Public Haus members") except Exception as e: logger.error(f"Error importing members: {e}") sys.exit(1) if __name__ == "__main__": main()