236 lines
8.3 KiB
Python
Executable File
236 lines
8.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Public Nouns NFT Holders Scraper
|
|
|
|
This script fetches holders of the Public Nouns NFT contract and stores their
|
|
Ethereum addresses in the database. It also attempts to resolve ENS names
|
|
for the addresses.
|
|
|
|
Usage:
|
|
python public_nouns_scraper.py
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
import requests
|
|
from web3 import Web3
|
|
from dotenv import load_dotenv
|
|
|
|
# Add parent directory to path to import utils
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from utils.db_connector import DatabaseConnector
|
|
from utils.ens_resolver import ENSResolver
|
|
from utils.logger import setup_logger
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Setup logging
|
|
logger = setup_logger("public_nouns_scraper")
|
|
|
|
class PublicNounsHoldersScraper:
|
|
"""Scraper for Public Nouns NFT holders."""
|
|
|
|
def __init__(self, contract_address: str = "0x93ecac71499147627DFEc6d0E494d50fCFFf10EE", collection_name: str = "Public Nouns"):
|
|
"""
|
|
Initialize the Public Nouns NFT holders scraper.
|
|
|
|
Args:
|
|
contract_address: Ethereum address of the Public Nouns NFT contract
|
|
collection_name: Name of the NFT collection
|
|
"""
|
|
self.contract_address = Web3.to_checksum_address(contract_address)
|
|
self.collection_name = collection_name
|
|
self.etherscan_api_key = os.getenv("ETHEREUM_ETHERSCAN_API_KEY")
|
|
self.alchemy_api_key = os.getenv("ALCHEMY_API_KEY")
|
|
self.web3 = Web3(Web3.HTTPProvider(f"https://eth-mainnet.g.alchemy.com/v2/{self.alchemy_api_key}"))
|
|
self.db = DatabaseConnector()
|
|
self.ens_resolver = ENSResolver(self.web3)
|
|
|
|
# Validate API keys
|
|
if not self.etherscan_api_key:
|
|
logger.error("ETHEREUM_ETHERSCAN_API_KEY not found in environment variables")
|
|
sys.exit(1)
|
|
if not self.alchemy_api_key:
|
|
logger.error("ALCHEMY_API_KEY not found in environment variables")
|
|
sys.exit(1)
|
|
|
|
# Register data source
|
|
self.register_data_source()
|
|
|
|
def register_data_source(self) -> None:
|
|
"""Register this NFT collection as a data source in the database."""
|
|
self.db.upsert_data_source(
|
|
name=f"NFT:{self.collection_name}",
|
|
source_type="NFT",
|
|
description=f"Holders of {self.collection_name} NFT ({self.contract_address})"
|
|
)
|
|
|
|
def get_token_owner(self, token_id: int) -> Optional[str]:
|
|
"""
|
|
Get the owner of a specific token ID.
|
|
|
|
Args:
|
|
token_id: The token ID to check
|
|
|
|
Returns:
|
|
The owner's Ethereum address or None if not found
|
|
"""
|
|
url = f"https://eth-mainnet.g.alchemy.com/nft/v2/{self.alchemy_api_key}/getOwnersForToken"
|
|
params = {
|
|
"contractAddress": self.contract_address,
|
|
"tokenId": hex(token_id) if isinstance(token_id, int) else token_id
|
|
}
|
|
|
|
try:
|
|
response = requests.get(url, params=params)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
owners = data.get("owners", [])
|
|
if owners and len(owners) > 0:
|
|
return owners[0]
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error fetching owner for token {token_id}: {str(e)}")
|
|
return None
|
|
|
|
def get_token_holders(self, max_token_id: int = 465) -> List[Dict[str, Any]]:
|
|
"""
|
|
Fetch all token holders for the Public Nouns NFT contract.
|
|
|
|
Args:
|
|
max_token_id: The maximum token ID to check (default: 465)
|
|
|
|
Returns:
|
|
List of dictionaries containing token ID and holder address
|
|
"""
|
|
logger.info(f"Fetching token holders for {self.collection_name} ({self.contract_address})")
|
|
|
|
# Start a scraping job
|
|
job_id = self.db.create_scraping_job(
|
|
source_name=f"NFT:{self.collection_name}",
|
|
status="running"
|
|
)
|
|
|
|
holders = []
|
|
records_processed = 0
|
|
records_added = 0
|
|
|
|
try:
|
|
# Iterate through token IDs from 0 to max_token_id
|
|
for token_id in range(max_token_id + 1):
|
|
records_processed += 1
|
|
|
|
# Log progress every 10 tokens
|
|
if token_id % 10 == 0:
|
|
logger.info(f"Processing token ID {token_id}/{max_token_id}")
|
|
|
|
# Get the owner of this token
|
|
owner = self.get_token_owner(token_id)
|
|
if owner:
|
|
holders.append({
|
|
"address": owner,
|
|
"token_id": str(token_id),
|
|
"collection_name": self.collection_name
|
|
})
|
|
records_added += 1
|
|
|
|
# Rate limiting to avoid API throttling
|
|
time.sleep(0.2)
|
|
|
|
# Update job with success
|
|
self.db.update_scraping_job(
|
|
job_id=job_id,
|
|
status="completed",
|
|
records_processed=records_processed,
|
|
records_added=records_added
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching token holders: {str(e)}")
|
|
self.db.update_scraping_job(job_id, "failed", error_message=str(e))
|
|
return []
|
|
|
|
logger.info(f"Found {len(holders)} token holders")
|
|
return holders
|
|
|
|
def process_holders(self, holders: List[Dict[str, Any]]) -> None:
|
|
"""
|
|
Process the list of holders and store in database.
|
|
|
|
Args:
|
|
holders: List of dictionaries containing token ID and holder address
|
|
"""
|
|
logger.info(f"Processing {len(holders)} holders")
|
|
|
|
for holder in holders:
|
|
address = Web3.to_checksum_address(holder["address"])
|
|
token_id = holder["token_id"]
|
|
|
|
# Try to resolve ENS name
|
|
ens_name = self.ens_resolver.get_ens_name(address)
|
|
|
|
# Get ENS profile if available
|
|
ens_profile = None
|
|
if ens_name:
|
|
ens_profile = self.ens_resolver.get_ens_profile(ens_name)
|
|
|
|
# Check for Farcaster information in the ENS profile
|
|
farcaster_info = None
|
|
if ens_profile and "farcaster" in ens_profile:
|
|
farcaster_info = json.dumps(ens_profile["farcaster"])
|
|
|
|
# Store in database
|
|
contact_id = self.db.upsert_contact(
|
|
ethereum_address=address,
|
|
ens_name=ens_name,
|
|
farcaster=farcaster_info
|
|
)
|
|
|
|
# Add NFT holding
|
|
self.db.add_nft_holding(
|
|
contact_id=contact_id,
|
|
contract_address=self.contract_address,
|
|
token_id=token_id,
|
|
collection_name=self.collection_name
|
|
)
|
|
|
|
# If we have an ENS name, try to get additional profile information
|
|
if ens_name:
|
|
self.ens_resolver.update_contact_from_ens(contact_id, ens_name)
|
|
|
|
# Rate limiting to avoid API throttling
|
|
time.sleep(0.1)
|
|
|
|
def run(self, max_token_id: int = 465) -> None:
|
|
"""
|
|
Run the scraper to fetch and process Public Nouns NFT holders.
|
|
|
|
Args:
|
|
max_token_id: The maximum token ID to check (default: 465)
|
|
"""
|
|
holders = self.get_token_holders(max_token_id)
|
|
if holders:
|
|
self.process_holders(holders)
|
|
logger.info("Public Nouns NFT holders scraping completed successfully")
|
|
else:
|
|
logger.warning("No holders found or error occurred")
|
|
|
|
def main():
|
|
"""Main entry point for the script."""
|
|
parser = argparse.ArgumentParser(description="Scrape Public Nouns NFT holders")
|
|
parser.add_argument("--max-token-id", type=int, default=465,
|
|
help="Maximum token ID to check (default: 465)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
scraper = PublicNounsHoldersScraper()
|
|
scraper.run(args.max_token_id)
|
|
|
|
if __name__ == "__main__":
|
|
main() |