Logo

dev-resources.site

for different kinds of informations.

How to Build your very own Google's NotebookLM

Published at
11/28/2024
Categories
python
ai
machinelearning
google
Author
Shagun Mistry
Categories
4 categories in total
python
open
ai
open
machinelearning
open
google
open
How to Build your very own Google's NotebookLM

With the rising popularity of audio content consumption, the ability to convert your documents or written content into realistic audio formats has been trending more recently.

While Google's NotebookLM has garnered attention in this space, I wanted to explore building a similar system using modern cloud services. In this article, I'll walk you through how I created a scalable, cloud-native system that converts documents into high-quality podcasts using FastAPI, Firebase, Google Cloud Pub/Sub, and Azure's Text-to-Speech service.

Here is a showcase you can refer to for the results of this system: MyPodify Showcase

The Challenge

Converting documents to podcasts isn't as simple as running text through a text-to-speech engine. It requires careful processing, natural language understanding, and the ability to handle various document formats while maintaining a smooth user experience. The system needs to:

  • Process multiple document formats efficiently
  • Generate natural-sounding audio with multiple voices
  • Handle large-scale document processing without affecting user experience
  • Provide real-time status updates to users
  • Maintain high availability and scalability

Architecture Deep Dive

Let's break down the key components and understand how they work together:

Image description

1. FastAPI Backend

FastAPI serves as our backend framework, chosen for several compelling reasons:

  • Async Support: Built on top of Starlette, FastAPI's async capabilities allow for efficient handling of concurrent requests
  • Automatic OpenAPI Documentation: Generates interactive API documentation out of the box
  • Type Safety: Leverages Python's type hints for runtime validation
  • High Performance: Comparable to Node.js and Go in terms of speed

Here's a detailed look at our upload endpoint:

@app.post('/upload')
async def upload_files(
    token: Annotated[ParsedToken, Depends(verify_firebase_token)],
    project_name: str,
    description: str,
    website_link: str,
    host_count: int,
    files: Optional[List[UploadFile]] = File(None)
):
    # Validate token
    user_id = token['uid']

    # Generate unique identifiers
    project_id = str(uuid.uuid4())
    podcast_id = str(uuid.uuid4())

    # Process and store files
    file_urls = await process_uploads(files, user_id, project_id)

    # Create Firestore document
    await create_project_document(user_id, project_id, {
        'status': 'pending',
        'created_at': datetime.now(),
        'project_name': project_name,
        'description': description,
        'file_urls': file_urls
    })

    # Trigger async processing
    await publish_to_pubsub(user_id, project_id, podcast_id, file_urls)

    return {'project_id': project_id, 'status': 'processing'}

2. Firebase Integration

Firebase provides two crucial services for our application:

Firebase Storage

  • Handles secure file uploads with automatic scaling
  • Provides CDN-backed distribution for generated audio files
  • Supports resume-able uploads for large files

Firestore

  • Real-time database for project status tracking
  • Document-based structure perfect for project metadata
  • Automatic scaling with no manual sharding required

Here's how we implement real-time status updates:

async def update_status(user_id: str, project_id: str, status: str, metadata: dict = None):
    doc_ref = db.collection('projects').document(f'{user_id}/{project_id}')

    update_data = {
        'status': status,
        'updated_at': datetime.now()
    }

    if metadata:
        update_data.update(metadata)

    await doc_ref.update(update_data)

3. Google Cloud Pub/Sub

Pub/Sub serves as our messaging backbone, enabling:

  • Decoupled architecture for better scalability
  • At-least-once delivery guarantee
  • Automatic message retention and replay
  • Dead letter queues for failed messages

Message structure example:

{
    'user_id': 'uid_123',
    'project_id': 'proj_456',
    'podcast_id': 'pod_789',
    'file_urls': ['gs://bucket/file1.pdf'],
    'description': 'Technical blog post about cloud architecture',
    'host_count': 2,
    'action': 'CREATE_PROJECT'
}

4. Voice Generation with Azure Speech Service

The core of our audio generation uses Azure's Cognitive Services Speech SDK. Let's look at how we implement natural-sounding voice synthesis:

import azure.cognitiveservices.speech as speechsdk
from pathlib import Path

class SpeechGenerator:
    def __init__(self):
        self.speech_config = speechsdk.SpeechConfig(
            subscription=os.getenv("AZURE_SPEECH_KEY"),
            region=os.getenv("AZURE_SPEECH_REGION")
        )

    async def create_speech_segment(self, text, voice, output_file):
        try:
            self.speech_config.speech_synthesis_voice_name = voice
            synthesizer = speechsdk.SpeechSynthesizer(
                speech_config=self.speech_config,
                audio_config=None
            )

            # Generate speech from text
            result = synthesizer.speak_text_async(text).get()

            if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
                with open(output_file, "wb") as audio_file:
                    audio_file.write(result.audio_data)
                return True

            return False

        except Exception as e:
            logger.error(f"Speech synthesis failed: {str(e)}")
            return False

One of the unique features of our system is the ability to generate multi-voice podcasts using AI. Here's how we handle script generation for different hosts:

async def generate_podcast_script(outline: str, analysis: str, host_count: int):
    # System instructions for different podcast formats
    system_instructions = TWO_HOST_SYSTEM_PROMPT if host_count > 1 else ONE_HOST_SYSTEM_PROMPT

    # Example of how we structure the AI conversation
    if host_count > 1:
        script_format = """
        **Alex**: "Hello and welcome to MyPodify! I'm your host Alex, joined by..."
        **Jane**: "Hi everyone! I'm Jane, and today we're diving into {topic}..."
        """
    else:
        script_format = """
        **Alex**: "Welcome to MyPodify! Today we're exploring {topic}..."
        """

    # Generate the complete script using AI
    script = await generate_content_from_openai(
        content=f"{outline}\n\nContent Details:{analysis}",
        system_instructions=system_instructions,
        purpose="Podcast Script"
    )

    return script

For voice synthesis, we map different speakers to specific Azure voices:

VOICE_MAPPING = {
    'Alex': 'en-US-AndrewMultilingualNeural',  # Male host
    'Jane': 'en-US-AvaMultilingualNeural',     # Female host
    'Narrator': 'en-US-BrandonMultilingualNeural'  # Neutral voice
}

5. Background Processing Worker

The worker component handles the heavy lifting:

  1. Document Analysis

    • Extract text from various document formats
    • Analyze document structure and content
    • Identify key topics and sections
  2. Content Processing

    • Generate natural conversation flow
    • Split content into speaker segments
    • Create transitions between topics
  3. Audio Generation

    • Convert text to speech using Azure's neural voices
    • Handle multiple speaker voices
    • Apply audio post-processing

Here's a simplified view of our worker logic:

async def process_document(message_data: dict):
    try:
        # Extract content from documents
        content = await extract_document_content(message_data['file_urls'])

        # Analyze and structure content
        document_analysis = await analyze_content(content)

        # Generate podcast script
        script = await generate_script(
            document_analysis,
            speaker_count=message_data['host_count']
        )

        # Convert to audio
        audio_segments = await generate_audio_segments(script)

        # Post-process audio
        final_audio = await post_process_audio(audio_segments)

        # Upload and update status
        audio_url = await upload_to_storage(final_audio)
        await update_status(
            message_data['user_id'],
            message_data['project_id'],
            'completed',
            {'audio_url': audio_url}
        )

    except Exception as e:
        await handle_processing_error(message_data, e)

Error Handling and Reliability

The system implements comprehensive error handling:

  1. Retry Logic

    • Exponential backoff for failed API calls
    • Maximum retry attempts configuration
    • Dead letter queue for failed messages
  2. Status Tracking

    • Detailed error messages stored in Firestore
    • Real-time status updates to users
    • Error aggregation for monitoring
  3. Resource Cleanup

    • Automatic temporary file deletion
    • Failed upload cleanup
    • Orphaned resource detection

Scaling and Performance Optimizations

To handle production loads, we've implemented several optimizations:

  1. Worker Scaling

    • Horizontal scaling based on queue length
    • Resource-based autoscaling
    • Regional deployment for lower latency
  2. Storage Optimization

    • Content deduplication
    • Compressed audio storage
    • CDN integration for delivery
  3. Processing Optimization

    • Batch processing for similar documents
    • Caching for repeated content
    • Parallel processing where possible

Monitoring and Observability

The system includes comprehensive monitoring:

async def track_metrics(stage: str, duration: float, metadata: dict = None):
    metrics = {
        'stage': stage,
        'duration_ms': duration * 1000,
        'timestamp': datetime.now()
    }

    if metadata:
        metrics.update(metadata)

    await publish_metrics(metrics)

Future Enhancements

While the current system works well, there are several exciting possibilities for future improvements:

  1. Enhanced Audio Processing

    • Background music integration
    • Advanced audio effects
    • Custom voice training
  2. Content Enhancement

    • Automatic chapter markers
    • Interactive transcripts
    • Multi-language support
  3. Platform Integration

    • Direct podcast platform publishing
    • RSS feed generation
    • Social media sharing

Building a document-to-podcast converter has been an exciting journey into modern cloud architecture. The combination of FastAPI, Firebase, Google Cloud Pub/Sub, and Azure's Text-to-Speech services provides a robust foundation for handling complex document processing at scale.

The event-driven architecture ensures the system remains responsive under load, while the use of managed services reduces operational overhead. Whether you're building a similar system or just exploring cloud-native architectures, I hope this deep dive has provided valuable insights into building scalable, production-ready applications.

Want to learn more about cloud architecture and modern application development? Follow me for more technical and practical tutorials.

Featured ones: