Skip to content

Nostr Discovery

Routstr Core integrates with Nostr (Notes and Other Stuff Transmitted by Relays) for decentralized provider discovery. This enables users to find Routstr nodes without relying on centralized directories.

Overview

Nostr integration provides:

  • Decentralized Discovery: Find providers through relay network
  • Cryptographic Identity: Providers identified by public keys
  • Real-time Updates: Live provider status and pricing
  • Censorship Resistance: No central point of control

How It Works

graph LR
    A[Routstr Node] --> B[Nostr Relay]
    B --> C[Nostr Relay]
    B --> D[Nostr Relay]

    E[User Client] --> B
    E --> C
    E --> D

    B --> F[Provider List]
    C --> F
    D --> F

Providers announce themselves by publishing signed events to Nostr relays. Clients can query these relays to discover available providers.

Provider Configuration

Setting Up Nostr Identity

  1. Generate Nostr Keys
# Using nostril or similar tool
nostril --generate-keypair

# Output:
# Private key (nsec): nsec1abc...
# Public key (npub): npub1xyz...
  1. Configure Environment
# .env
NPUB=npub1xyz...  # Your public key
NSEC=nsec1abc...  # Your private key (keep secret!)
NAME=Lightning AI Gateway
DESCRIPTION=Fast and reliable AI API with Bitcoin payments
HTTP_URL=https://api.lightning-ai.com
ONION_URL=http://lightningai.onion

Publishing to Nostr

Routstr automatically publishes provider information to configured relays:

# Published event structure (NIP-89)
{
    "kind": 31990,  # Application handler event
    "pubkey": "your_public_key",
    "content": {
        "name": "Lightning AI Gateway",
        "description": "Fast and reliable AI API",
        "endpoints": {
            "http": "https://api.lightning-ai.com",
            "onion": "http://lightningai.onion"
        },
        "models": ["gpt-3.5-turbo", "gpt-4", "claude-3"],
        "pricing": {
            "gpt-3.5-turbo": {
                "prompt_sats_per_1k": 3,
                "completion_sats_per_1k": 4
            }
        },
        "cashu_mints": [
            "https://mint.minibits.cash/Bitcoin"
        ]
    },
    "tags": [
        ["d", "routstr"],
        ["t", "ai-api"],
        ["t", "bitcoin"],
        ["p", "payment-proxy"]
    ]
}

Relay Configuration

Configure which relays to publish to:

# Default relays
DEFAULT_RELAYS = [
    "wss://relay.damus.io",
    "wss://relay.nostr.band",
    "wss://relay.routstr.com",
    "wss://nos.lol"
]

# Custom relay configuration
RELAYS=wss://relay1.com,wss://relay2.com

Client Discovery

Using the Discovery Endpoint

Find providers through the API:

GET /v1/providers

Response:
{
  "providers": [
    {
      "name": "Lightning AI Gateway",
      "npub": "npub1xyz...",
      "description": "Fast and reliable AI API",
      "endpoints": {
        "http": "https://api.lightning-ai.com",
        "onion": "http://lightningai.onion"
      },
      "models": ["gpt-3.5-turbo", "gpt-4"],
      "pricing": {
        "gpt-3.5-turbo": {
          "prompt_sats_per_1k": 3,
          "completion_sats_per_1k": 4
        }
      },
      "last_seen": "2024-01-01T12:00:00Z",
      "reliability_score": 0.99
    }
  ]
}

Direct Nostr Queries

Query Nostr relays directly:

import json
import websocket

def discover_providers(relay_url: str):
    """Discover Routstr providers from Nostr relay."""
    ws = websocket.create_connection(relay_url)

    # Subscribe to provider events
    subscription = {
        "kinds": [31990],
        "tags": {
            "d": ["routstr"]
        }
    }

    ws.send(json.dumps(["REQ", "sub1", subscription]))

    providers = []
    while True:
        response = json.loads(ws.recv())
        if response[0] == "EVENT":
            event = response[2]
            providers.append(parse_provider_event(event))
        elif response[0] == "EOSE":  # End of stored events
            break

    ws.close()
    return providers

JavaScript/TypeScript

import { SimplePool } from 'nostr-tools';

async function discoverProviders(): Promise<Provider[]> {
    const pool = new SimplePool();
    const relays = [
        'wss://relay.damus.io',
        'wss://relay.nostr.band'
    ];

    const filter = {
        kinds: [31990],
        '#d': ['routstr']
    };

    const events = await pool.list(relays, [filter]);

    return events.map(event => ({
        name: event.content.name,
        npub: nip19.npubEncode(event.pubkey),
        url: event.content.endpoints.http,
        models: event.content.models,
        pricing: event.content.pricing
    }));
}

Provider Ranking

Reliability Scoring

Providers are ranked based on:

class ProviderScore:
    def calculate(self, provider: Provider) -> float:
        score = 1.0

        # Uptime (based on recent checks)
        uptime_ratio = provider.successful_pings / provider.total_pings
        score *= uptime_ratio

        # Response time
        if provider.avg_response_time < 500:  # ms
            score *= 1.0
        elif provider.avg_response_time < 1000:
            score *= 0.9
        else:
            score *= 0.7

        # Model availability
        model_score = len(provider.models) / 10  # Max 10 models
        score *= min(1.0, 0.5 + model_score * 0.5)

        # Price competitiveness
        if provider.is_cheapest_for_any_model():
            score *= 1.1

        return min(1.0, score)

Provider Selection

Choose optimal provider:

def select_provider(
    providers: list[Provider],
    model: str,
    requirements: dict
) -> Provider:
    """Select best provider for requirements."""

    # Filter by model availability
    candidates = [p for p in providers if model in p.models]

    # Filter by requirements
    if requirements.get('tor_required'):
        candidates = [p for p in candidates if p.onion_url]

    if requirements.get('max_price_per_1k'):
        max_price = requirements['max_price_per_1k']
        candidates = [
            p for p in candidates
            if p.pricing[model]['prompt_sats_per_1k'] <= max_price
        ]

    # Sort by score
    candidates.sort(key=lambda p: p.reliability_score, reverse=True)

    return candidates[0] if candidates else None

Publishing Updates

Automatic Updates

Routstr publishes updates when:

  • Node starts up
  • Configuration changes
  • Models are added/removed
  • Pricing updates

Manual Publishing

Force publish current state:

async def publish_provider_info():
    """Manually publish provider information."""
    event = create_provider_event(
        name=os.getenv("NAME"),
        description=os.getenv("DESCRIPTION"),
        models=get_available_models(),
        pricing=get_current_pricing()
    )

    await publish_to_relays(event, RELAYS)

Event Lifecycle

# Publish every 6 hours
@periodic_task(hours=6)
async def update_nostr_presence():
    """Keep provider information fresh."""
    try:
        await publish_provider_info()
        logger.info("Updated Nostr presence")
    except Exception as e:
        logger.error(f"Failed to update Nostr: {e}")

# Delete on shutdown
async def remove_nostr_presence():
    """Remove provider from discovery."""
    deletion_event = create_deletion_event()
    await publish_to_relays(deletion_event, RELAYS)

Security Considerations

Key Management

  1. Secure Storage
# Never log private keys
SENSITIVE_VARS = ['NSEC', 'ADMIN_PASSWORD']

def sanitize_env(env_dict: dict) -> dict:
    return {
        k: '***' if k in SENSITIVE_VARS else v
        for k, v in env_dict.items()
    }
  1. Key Rotation
# Generate new keys
nostril --generate-keypair

# Update configuration
# Publish transition event
# Update all references

Event Validation

Verify provider events:

def validate_provider_event(event: dict) -> bool:
    """Validate provider announcement."""
    # Check signature
    if not verify_signature(event):
        return False

    # Check required fields
    required = ['name', 'endpoints', 'models', 'pricing']
    content = json.loads(event['content'])
    if not all(field in content for field in required):
        return False

    # Verify endpoints are reachable
    if not await check_endpoints(content['endpoints']):
        return False

    return True

Relay Security

Choose relays carefully:

TRUSTED_RELAYS = {
    'wss://relay.damus.io': {
        'operator': 'Damus',
        'reputation': 'high',
        'filters_spam': True
    },
    'wss://relay.nostr.band': {
        'operator': 'Nostr.Band',
        'reputation': 'high',
        'paid_tier': True
    }
}

Advanced Features

Multi-Relay Broadcasting

Ensure wide distribution:

async def broadcast_to_relays(event: dict, relays: list[str]):
    """Broadcast event to multiple relays."""
    tasks = []
    for relay in relays:
        task = asyncio.create_task(
            publish_to_relay(event, relay)
        )
        tasks.append(task)

    results = await asyncio.gather(*tasks, return_exceptions=True)

    successful = sum(1 for r in results if not isinstance(r, Exception))
    logger.info(f"Published to {successful}/{len(relays)} relays")

Provider Metadata

Extended metadata in events:

{
    "kind": 31990,
    "content": {
        "name": "Lightning AI",
        "description": "Enterprise AI API",
        "metadata": {
            "established": "2024-01-01",
            "total_requests": 1000000,
            "average_response_ms": 250,
            "supported_features": [
                "streaming",
                "function_calling",
                "vision",
                "embeddings"
            ],
            "certifications": ["SOC2", "GDPR"],
            "contact": {
                "nostr": "npub1contact...",
                "email": "support@lightning-ai.com"
            }
        }
    }
}

Discovery Filters

Advanced filtering options:

# Find providers with specific features
GET /v1/providers?features=streaming,vision&max_price=5&min_reliability=0.95

# Response includes filtered results
{
    "providers": [...],
    "filters_applied": {
        "features": ["streaming", "vision"],
        "max_price_sats_per_1k": 5,
        "min_reliability": 0.95
    },
    "total_providers": 50,
    "matching_providers": 12
}

Monitoring

Discovery Metrics

Track discovery performance:

class DiscoveryMetrics:
    def __init__(self):
        self.relay_health = {}
        self.provider_count = 0
        self.query_latency = []

    async def check_relay_health(self, relay_url: str):
        """Monitor relay connectivity."""
        start = time.time()
        try:
            await connect_to_relay(relay_url)
            latency = time.time() - start
            self.relay_health[relay_url] = {
                'status': 'healthy',
                'latency_ms': latency * 1000
            }
        except Exception as e:
            self.relay_health[relay_url] = {
                'status': 'unhealthy',
                'error': str(e)
            }

Provider Monitoring

@periodic_task(minutes=5)
async def monitor_providers():
    """Check provider health."""
    providers = await discover_providers()

    for provider in providers:
        try:
            # Test endpoint
            response = await test_provider_endpoint(provider.http_url)

            # Update metrics
            await update_provider_metrics(
                provider.npub,
                success=response.status_code == 200,
                response_time=response.elapsed
            )
        except Exception as e:
            logger.warning(f"Provider {provider.name} check failed: {e}")

Troubleshooting

No Providers Found

# Debug discovery issues
async def debug_discovery():
    """Diagnose discovery problems."""
    issues = []

    # Check relay connectivity
    for relay in RELAYS:
        if not await can_connect_to_relay(relay):
            issues.append(f"Cannot connect to {relay}")

    # Check event publishing
    if not await verify_own_events_visible():
        issues.append("Own events not visible on relays")

    # Check filters
    if len(await get_all_provider_events()) == 0:
        issues.append("No provider events on any relay")

    return issues

Relay Connection Issues

# Test relay connection
wscat -c wss://relay.damus.io

# Send subscription
["REQ","test",{"kinds":[31990],"#d":["routstr"]}]

Best Practices

For Providers

  1. Consistent Identity
  2. Use same npub across services
  3. Maintain profile metadata
  4. Verify identity on multiple platforms

  5. Regular Updates

  6. Publish status every few hours
  7. Update pricing promptly
  8. Remove stale information

  9. Relay Diversity

  10. Publish to 5+ relays
  11. Include regional relays
  12. Monitor relay health

For Clients

  1. Verify Providers
  2. Check multiple relays
  3. Verify endpoints work
  4. Monitor reliability over time

  5. Cache Discovery

  6. Cache provider list
  7. Refresh periodically
  8. Handle stale data gracefully

  9. Fallback Options

  10. Keep backup providers
  11. Handle discovery failures
  12. Support manual configuration

Next Steps