#!/usr/bin/env python3 """ Database Connector Utility for connecting to the PostgreSQL database and performing operations. """ import os import sys from typing import Dict, List, Optional, Any import psycopg2 from psycopg2.extras import RealDictCursor from dotenv import load_dotenv # Load environment variables load_dotenv() class DatabaseConnector: """Connector for the PostgreSQL database.""" def __init__(self): """Initialize the database connector.""" self.db_url = os.getenv("PYTHON_DATABASE_URL") if not self.db_url: # Fallback to DATABASE_URL but remove the schema parameter db_url = os.getenv("DATABASE_URL") if db_url and "?schema=" in db_url: self.db_url = db_url.split("?schema=")[0] else: raise ValueError("DATABASE_URL not found in environment variables") # Connect to the database self.conn = psycopg2.connect(self.db_url) self.conn.autocommit = True def __del__(self): """Close the database connection when the object is destroyed.""" if hasattr(self, 'conn') and self.conn: self.conn.close() def execute_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """ Execute a SQL query and return the results. Args: query: SQL query to execute params: Parameters for the query Returns: List of dictionaries containing the query results """ with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params or {}) if cursor.description: return cursor.fetchall() return [] def execute_update(self, query: str, params: Optional[Dict[str, Any]] = None) -> int: """ Execute a SQL update query and return the number of affected rows. Args: query: SQL query to execute params: Parameters for the query Returns: Number of affected rows """ with self.conn.cursor() as cursor: cursor.execute(query, params or {}) return cursor.rowcount def upsert_contact(self, ethereum_address: str, ens_name: Optional[str] = None, ethereum_address2: Optional[str] = None, warpcast_address: Optional[str] = None, farcaster: Optional[str] = None, other_social: Optional[str] = None) -> str: """ Insert or update a contact in the database. Args: ethereum_address: Ethereum address of the contact ens_name: ENS name of the contact, if available ethereum_address2: Second Ethereum address of the contact, if available warpcast_address: Warpcast address of the contact, if available farcaster: Farcaster handle of the contact, if available other_social: Other social media information of the contact, if available Returns: ID of the inserted or updated contact """ query = """ INSERT INTO "Contact" ( id, "ethereumAddress", "ensName", "ethereumAddress2", "warpcastAddress", "farcaster", "otherSocial", "createdAt", "updatedAt" ) VALUES ( gen_random_uuid(), %(address)s, %(ens_name)s, %(address2)s, %(warpcast)s, %(farcaster)s, %(other_social)s, NOW(), NOW() ) ON CONFLICT ("ethereumAddress") DO UPDATE SET "ensName" = COALESCE(EXCLUDED."ensName", "Contact"."ensName"), "ethereumAddress2" = COALESCE(EXCLUDED."ethereumAddress2", "Contact"."ethereumAddress2"), "warpcastAddress" = COALESCE(EXCLUDED."warpcastAddress", "Contact"."warpcastAddress"), "farcaster" = COALESCE(EXCLUDED."farcaster", "Contact"."farcaster"), "otherSocial" = COALESCE(EXCLUDED."otherSocial", "Contact"."otherSocial"), "updatedAt" = NOW() RETURNING id """ result = self.execute_query(query, { "address": ethereum_address, "ens_name": ens_name, "address2": ethereum_address2, "warpcast": warpcast_address, "farcaster": farcaster, "other_social": other_social }) return result[0]["id"] def update_contact(self, contact_id: str, data: Dict[str, Any]) -> None: """ Update a contact with additional information. Args: contact_id: ID of the contact to update data: Dictionary of fields to update """ # Build the SET clause dynamically based on provided data set_clauses = [] params = {"id": contact_id} for key, value in data.items(): if value is not None: set_clauses.append(f'"{key}" = %({key})s') params[key] = value if not set_clauses: return set_clause = ", ".join(set_clauses) set_clause += ', "updatedAt" = NOW()' query = f""" UPDATE "Contact" SET {set_clause} WHERE id = %(id)s """ self.execute_update(query, params) def add_nft_holding(self, contact_id: str, contract_address: str, token_id: str, collection_name: Optional[str] = None) -> None: """ Add an NFT holding for a contact. Args: contact_id: ID of the contact contract_address: Contract address of the NFT token_id: Token ID of the NFT collection_name: Name of the NFT collection """ query = """ INSERT INTO "NftHolding" ( id, "contactId", "contractAddress", "tokenId", "collectionName", "createdAt", "updatedAt" ) VALUES ( gen_random_uuid(), %(contact_id)s, %(contract_address)s, %(token_id)s, %(collection_name)s, NOW(), NOW() ) ON CONFLICT ("contactId", "contractAddress", "tokenId") DO UPDATE SET "collectionName" = COALESCE(EXCLUDED."collectionName", "NftHolding"."collectionName"), "updatedAt" = NOW() """ self.execute_update(query, { "contact_id": contact_id, "contract_address": contract_address, "token_id": token_id, "collection_name": collection_name }) def add_token_holding(self, contact_id: str, contract_address: str, balance: str, token_symbol: Optional[str] = None) -> None: """ Add a token holding for a contact. Args: contact_id: ID of the contact contract_address: Contract address of the token balance: Token balance token_symbol: Symbol of the token """ query = """ INSERT INTO "TokenHolding" ( id, "contactId", "contractAddress", "tokenSymbol", balance, "lastUpdated", "createdAt", "updatedAt" ) VALUES ( gen_random_uuid(), %(contact_id)s, %(contract_address)s, %(token_symbol)s, %(balance)s, NOW(), NOW(), NOW() ) ON CONFLICT ("contactId", "contractAddress") DO UPDATE SET "tokenSymbol" = COALESCE(EXCLUDED."tokenSymbol", "TokenHolding"."tokenSymbol"), balance = %(balance)s, "lastUpdated" = NOW(), "updatedAt" = NOW() """ self.execute_update(query, { "contact_id": contact_id, "contract_address": contract_address, "token_symbol": token_symbol, "balance": balance }) def add_dao_membership(self, contact_id: str, dao_name: str, dao_type: str, joined_at: Optional[str] = None) -> None: """ Add a DAO membership for a contact. Args: contact_id: ID of the contact dao_name: Name of the DAO dao_type: Type of the DAO joined_at: Date when the contact joined the DAO """ query = """ INSERT INTO "DaoMembership" ( id, "contactId", "daoName", "daoType", "joinedAt", "createdAt", "updatedAt" ) VALUES ( gen_random_uuid(), %(contact_id)s, %(dao_name)s, %(dao_type)s, %(joined_at)s, NOW(), NOW() ) ON CONFLICT ("contactId", "daoName") DO UPDATE SET "daoType" = COALESCE(EXCLUDED."daoType", "DaoMembership"."daoType"), "joinedAt" = COALESCE(EXCLUDED."joinedAt", "DaoMembership"."joinedAt"), "updatedAt" = NOW() """ self.execute_update(query, { "contact_id": contact_id, "dao_name": dao_name, "dao_type": dao_type, "joined_at": joined_at }) def add_tag_to_contact(self, contact_id: str, tag_name: str, color: Optional[str] = None) -> None: """ Add a tag to a contact. Args: contact_id: ID of the contact tag_name: Name of the tag color: Color of the tag """ # First, ensure the tag exists tag_query = """ INSERT INTO "Tag" (id, name, color, "createdAt", "updatedAt") VALUES (gen_random_uuid(), %(name)s, %(color)s, NOW(), NOW()) ON CONFLICT (name) DO UPDATE SET color = COALESCE(EXCLUDED.color, "Tag".color), "updatedAt" = NOW() RETURNING id """ tag_result = self.execute_query(tag_query, { "name": tag_name, "color": color }) tag_id = tag_result[0]["id"] # Then, add the tag to the contact relation_query = """ INSERT INTO "TagsOnContacts" ("contactId", "tagId", "assignedAt") VALUES (%(contact_id)s, %(tag_id)s, NOW()) ON CONFLICT ("contactId", "tagId") DO NOTHING """ self.execute_update(relation_query, { "contact_id": contact_id, "tag_id": tag_id }) def add_note_to_contact(self, contact_id: str, content: str) -> None: """ Add a note to a contact. Args: contact_id: ID of the contact content: Content of the note """ query = """ INSERT INTO "Note" (id, "contactId", content, "createdAt", "updatedAt") VALUES (gen_random_uuid(), %(contact_id)s, %(content)s, NOW(), NOW()) """ self.execute_update(query, { "contact_id": contact_id, "content": content }) def link_contact_to_data_source(self, contact_id: str, data_source_id: str) -> None: """ Link a contact to a data source in the ContactSource table. Args: contact_id: ID of the contact data_source_id: ID of the data source """ # Check if the ContactSource table exists query = """ SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'ContactSource' ) """ result = self.execute_query(query) if not result[0]["exists"]: # Table doesn't exist, create it query = """ CREATE TABLE "ContactSource" ( id TEXT PRIMARY KEY, "contactId" TEXT NOT NULL, "dataSourceId" TEXT NOT NULL, "createdAt" TIMESTAMP NOT NULL, "updatedAt" TIMESTAMP NOT NULL, FOREIGN KEY ("contactId") REFERENCES "Contact"(id) ON DELETE CASCADE, FOREIGN KEY ("dataSourceId") REFERENCES "DataSource"(id) ON DELETE CASCADE, UNIQUE("contactId", "dataSourceId") ); CREATE INDEX "ContactSource_contactId_idx" ON "ContactSource"("contactId"); CREATE INDEX "ContactSource_dataSourceId_idx" ON "ContactSource"("dataSourceId"); """ self.execute_update(query) # Insert the link query = """ INSERT INTO "ContactSource" ( id, "contactId", "dataSourceId", "createdAt", "updatedAt" ) VALUES ( gen_random_uuid(), %(contact_id)s, %(data_source_id)s, NOW(), NOW() ) ON CONFLICT ("contactId", "dataSourceId") DO UPDATE SET "updatedAt" = NOW() """ self.execute_update(query, { "contact_id": contact_id, "data_source_id": data_source_id }) def get_contact_sources(self, contact_id: str) -> List[Dict[str, Any]]: """ Get all data sources for a contact. Args: contact_id: ID of the contact Returns: List of data sources for the contact """ query = """ SELECT ds.id, ds.name, ds.type, ds.description FROM "ContactSource" cs JOIN "DataSource" ds ON cs."dataSourceId" = ds.id WHERE cs."contactId" = %(contact_id)s """ return self.execute_query(query, {"contact_id": contact_id}) def upsert_data_source(self, name: str, source_type: str, description: Optional[str] = None) -> str: """ Insert or update a data source in the database. Args: name: Name of the data source source_type: Type of the data source description: Description of the data source Returns: ID of the inserted or updated data source """ query = """ INSERT INTO "DataSource" (id, name, type, description, "createdAt", "updatedAt") VALUES (gen_random_uuid(), %(name)s, %(type)s, %(description)s, NOW(), NOW()) ON CONFLICT (name) DO UPDATE SET type = EXCLUDED.type, description = COALESCE(EXCLUDED.description, "DataSource".description), "updatedAt" = NOW() RETURNING id """ result = self.execute_query(query, { "name": name, "type": source_type, "description": description }) return result[0]["id"] def create_scraping_job(self, source_name: str, status: str = "pending") -> str: """ Create a new scraping job. Args: source_name: Name of the data source status: Initial status of the job Returns: ID of the created job """ query = """ INSERT INTO "ScrapingJob" ( id, "sourceName", status, "startedAt", "createdAt", "updatedAt" ) VALUES ( gen_random_uuid(), %(source_name)s, %(status)s, CASE WHEN %(status)s = 'running' THEN NOW() ELSE NULL END, NOW(), NOW() ) RETURNING id """ result = self.execute_query(query, { "source_name": source_name, "status": status }) return result[0]["id"] def update_scraping_job(self, job_id: str, status: str, records_processed: int = 0, records_added: int = 0, records_updated: int = 0, error_message: Optional[str] = None) -> None: """ Update a scraping job. Args: job_id: ID of the job to update status: New status of the job records_processed: Number of records processed records_added: Number of records added records_updated: Number of records updated error_message: Error message if the job failed """ query = """ UPDATE "ScrapingJob" SET status = %(status)s, "startedAt" = CASE WHEN %(status)s = 'running' AND "startedAt" IS NULL THEN NOW() ELSE "startedAt" END, "completedAt" = CASE WHEN %(status)s IN ('completed', 'failed') THEN NOW() ELSE "completedAt" END, "recordsProcessed" = "recordsProcessed" + %(records_processed)s, "recordsAdded" = "recordsAdded" + %(records_added)s, "recordsUpdated" = "recordsUpdated" + %(records_updated)s, "errorMessage" = CASE WHEN %(error_message)s IS NOT NULL THEN %(error_message)s ELSE "errorMessage" END, "updatedAt" = NOW() WHERE id = %(job_id)s """ self.execute_update(query, { "job_id": job_id, "status": status, "records_processed": records_processed, "records_added": records_added, "records_updated": records_updated, "error_message": error_message })