Logo

dev-resources.site

for different kinds of informations.

The Core of FastAPI: A Deep Dive into Starlette šŸŒŸšŸŒŸšŸŒŸ

Published at
1/13/2025
Categories
webdev
python
fastapi
django
Author
Leapcell
Categories
4 categories in total
webdev
open
python
open
fastapi
open
django
open
The Core of FastAPI: A Deep Dive into Starlette šŸŒŸšŸŒŸšŸŒŸ

Image description

FastAPI is essentially an API wrapper for Starlette. To fully grasp FastAPI, one must first understand Starlette.

Image description

1. ASGI Protocol

Uvicorn interacts with ASGI applications through a common interface. An application can send and receive information via Uvicorn by implementing the following code:

async def app(scope, receive, send):
    # The simplest ASGI application
    assert scope['type'] == 'http'
    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [
            [b'content-type', b'text/plain'],
        ]
    })
    await send({
        'type': 'http.response.body',
        'body': b'Hello, world!',
    })
if __name__ == "__main__":
    # Uvicorn service
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")

2. Starlette

To start Starlette with Uvicorn, use the following code:

from starlette.applications import Starlette
from starlette.middleware.gzip import GZipMiddleware

app: Starlette = Starlette()

@app.route("/")
def demo_route() -> None: pass

@app.websocket_route("/")
def demo_websocket_route() -> None: pass

@app.add_exception_handlers(404)
def not_found_route() -> None: pass

@app.on_event("startup")
def startup_event_demo() -> None: pass

@app.on_event("shutdown")
def shutdown_event_demo() -> None: pass

app.add_middleware(GZipMiddleware)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=5000)

This code initializes Starlette, registers routes, exception handlers, events, and middleware, and then passes it to uvicorn.run. The uvicorn.run method sends request data by calling Starlette's call method.

Let's first analyze the initialization of Starlette:

class Starlette:
    def __init__(
        self,
        debug: bool = False,
        routes: typing.Sequence[BaseRoute] = None,
        middleware: typing.Sequence[Middleware] = None,
        exception_handlers: typing.Dict[
            typing.Union[int, typing.Type[Exception]], typing.Callable
        ] = None,
        on_startup: typing.Sequence[typing.Callable] = None,
        on_shutdown: typing.Sequence[typing.Callable] = None,
        lifespan: typing.Callable[["Starlette"], typing.AsyncGenerator] = None,
    ) -> None:
        """
        :param debug: Decide whether to enable the debug function.
        :param route: A list of routes, providing HTTP and WebSocket services.
        :param middleware: A list of middleware, applied to each request.
        :param exception_handler: A dictionary storing exception callbacks, with HTTP status codes as keys and callback functions as values.
        :on_startup: Callback functions called during startup.
        :on_shutdown: Callback functions called during shutdown.
        :lifespan: The lifespan function in ASGI.
        """

        # If lifespan is passed, on_startup and on_shutdown cannot be passed
        # Because Starlette essentially converts on_start_up and on_shutdown into a lifespan for Uvicorn calls
        assert lifespan is None or (
            on_startup is None and on_shutdown is None
        ), "Use either 'lifespan' or 'on_startup'/'on_shutdown', not both."

        # Initialize variables
        self._debug = debug
        self.state = State()
        self.router = Router(
            routes, on_startup=on_startup, on_shutdown=on_shutdown, lifespan=lifespan
        )
        self.exception_handlers = (
            {} if exception_handlers is None else dict(exception_handlers)
        )
        self.user_middleware = [] if middleware is None else list(middleware)
        # Build middleware
        self.middleware_stack = self.build_middleware_stack()

As can be seen from the code, the initialization already meets most of the requirements. However, there is a function for building middleware that needs further analysis:

class Starlette:
    def build_middleware_stack(self) -> ASGIApp:
        debug = self.debug
        error_handler = None
        exception_handlers = {}

        # Parse exception handling callbacks and store them in error_handler and exception_handlers
        # Only HTTP status code 500 will be stored in error_handler
        for key, value in self.exception_handlers.items():
            if key in (500, Exception):
                error_handler = value
            else:
                exception_handlers[key] = value

        # Order different types of middleware
        # The first layer is ServerErrorMiddleware, which can print the error stack when an exception is found, or display an error page in debug mode for easy debugging.
        # The second layer is the user middleware layer, where all user-registered middleware will be stored.
        # The third layer is ExceptionMiddleware, which is the exception handling layer and will handle all exceptions thrown during route execution.
        middleware = (
            [Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug)]
            + self.user_middleware
            + [
                Middleware(
                    ExceptionMiddleware, handlers=exception_handlers, debug=debug
                )
            ]
        )

        # Finally, load the middleware into the app
        app = self.router
        for cls, options in reversed(middleware):
            # cls is the middleware class itself, and options are the parameters we pass
            # It can be seen that the middleware itself is also an ASGI APP, and loading middleware is like nesting one ASGI APP inside another, like a matryoshka doll.
            app = cls(app=app, **options)

        # Since the middleware is loaded in a nested way, and the call is made through `call_next` to call the upper ASGI APP, the reverse order method is used.
        return app

After building the middleware, the initialization is complete, and then the uvicorn.run method will call the call method:

class Starlette:
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        scope["app"] = self
        await self.middleware_stack(scope, receive, send)

This method is simple. It sets the app in the request flow through scope for subsequent calls, and then starts request processing by calling middleware_stack. From this method and the middleware initialization, it can be seen that the middleware in Starlette is also an ASGI APP (it can also be seen that the route is an ASGI APP at the bottom of the call stack). At the same time, Starlette also hands over exception handling to the middleware, which is rarely seen in other web application frameworks. It can be seen that Starlette is designed so that each component is as much as possible an ASGI APP.

2. Middleware

As mentioned above, in Starlette, middleware is an ASGI APP. So, all middleware in Starlette must be a class that meets the following form:

class BaseMiddleware:
    def __init__(self, app: ASGIApp) -> None:
        pass

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        pass

In starlette.middleware, there are many middleware implementations that meet this requirement. However, this chapter will not cover all middleware, but only select a few representative ones for analysis from the closest to the route to the farthest.

2.1. Exception Handling Middleware - ExceptionMiddleware

The first is the ExceptionMiddleware. Users do not directly interact with this middleware (so it is not placed in starlette.middleware), but interact with it indirectly through the following method:

@app.app_exception_handlers(404)
def not_found_route() -> None: pass

When the user uses this method, Starlette will hang the callback function in the corresponding dictionary, with the HTTP status code as the key and the callback function as the value.
When ExceptionMiddleware finds that the route request processing has an exception, it can find the corresponding callback function through the HTTP status code of the exception response, pass the request and exception to the user-mounted callback function, and finally throw the result of the user's callback function back to the previous ASGI APP.
In addition, ExceptionMiddleware also supports exception registration. When the exception thrown by the route matches the registered exception, the corresponding callback function for that exception registration is called.
The source code and comments of this class are as follows:

class ExceptionMiddleware:
    def __init__(
        self, app: ASGIApp, handlers: dict = None, debug: bool = False
    ) -> None:
        self.app = app
        self.debug = debug  # TODO: We ought to handle 404 cases if debug is set.
        # Starlette supports both HTTP status codes and Exception types
        self._status_handlers = {}  # type: typing.Dict[int, typing.Callable]
        self._exception_handlers = {
            HTTPException: self.http_exception
        }  # type: typing.Dict[typing.Type[Exception], typing.Callable]
        if handlers is not None:
            for key, value in handlers.items():
                self.add_exception_handler(key, value)

    def add_exception_handler(
        self,
        exc_class_or_status_code: typing.Union[int, typing.Type[Exception]],
        handler: typing.Callable,
    ) -> None:
        # The exception callbacks mounted by the user through the Starlette app method are finally mounted into the _status_handlers or _exception_handler in the class through this method.
        if isinstance(exc_class_or_status_code, int):
            self._status_handlers[exc_class_or_status_code] = handler
        else:
            assert issubclass(exc_class_or_status_code, Exception)
            self._exception_handlers[exc_class_or_status_code] = handler

    def _lookup_exception_handler(
        self, exc: Exception
    ) -> typing.Optional[typing.Callable]:
        # Look up the callback function related to the registered exception, find the corresponding callback function for the exception through mro
        # 
        # The user may mount a base class, and subsequent subclasses of the mounted exception will also call the callback registered for the base class.
        # For example, the user registers a base class, and then there are two exceptions, user exception and system exception, both inheriting from this base class.
        # When the function throws the user exception or the system exception later, the corresponding callback registered for the base class will be executed.
        for cls in type(exc).__mro__:
            if cls in self._exception_handlers:
                return self._exception_handlers[cls]
        return None

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        # The familiar ASGI call method
        if scope["type"]!= "http":
            # Does not support WebSocket requests
            await self.app(scope, receive, send)
            return

        # Prevent multiple exceptions from occurring in the same response
        response_started = False

        async def sender(message: Message) -> None:
            nonlocal response_started

            if message["type"] == "http.response.start":
                response_started = True
            await send(message)

        try:
            # Call the next ASGI APP
            await self.app(scope, receive, sender)
        except Exception as exc:
            handler = None

            if isinstance(exc, HTTPException):
                # If it is an HTTPException, look for it in the registered HTTP callback dictionary
                handler = self._status_handlers.get(exc.status_code)

            if handler is None:
                # If it is a normal exception, look for it in the exception callback dictionary
                handler = self._lookup_exception_handler(exc)

            if handler is None:
                # If the corresponding exception cannot be found, throw it upwards
                raise exc from None

            # Only handle one exception per response
            if response_started:
                msg = "Caught handled exception, but response already started."
                raise RuntimeError(msg) from exc

            request = Request(scope, receive=receive)
            if asyncio.iscoroutinefunction(handler):
                response = await handler(request, exc)
            else:
                response = await run_in_threadpool(handler, request, exc)
            # Process the request with the response generated by the callback function
            await response(scope, receive, sender)

2.2. User Middleware

Next is the user middleware, which is the middleware we come into contact with the most. When using starlette.middleware, we usually inherit from a middleware called BaseHTTPMiddleware and expand based on the following code:

class DemoMiddleware(BaseHTTPMiddleware):
    def __init__(
        self,
        app: ASGIApp,
    ) -> None:
        super(DemoMiddleware, self).__init__(app)

    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        # Before
        response: Response = await call_next(request)
        # After
        return response

If you want to perform preprocessing before the request, write relevant code in the before block. If you want to process after the request, write code in the after block. It is very simple to use, and they are in the same scope, which means that the variables in this method do not need to be propagated through context or dynamic variables (if you have come into contact with the middleware implementation in Django or Flask, you will understand the elegance of Starlette's implementation).
Now let's see how it is implemented. The code is very simple, about 60 lines, but with a lot of comments:

class BaseHTTPMiddleware:
    def __init__(self, app: ASGIApp, dispatch: DispatchFunction = None) -> None:
        # Assign the next-level ASGI app
        self.app = app
        # If the user passes dispatch, use the user-passed function, otherwise use its own dispatch
        # Generally, users inherit from BaseHTTPMiddleware and rewrite the dispatch method
        self.dispatch_func = self.dispatch if dispatch is None else dispatch

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        A function with the ASGI standard function signature, representing that ASGI requests will enter from here.
        """
        if scope["type"]!= "http":
            # If the type is not http, the middleware will not be passed (that is, WebSocket is not supported)
            # To support WebSocket, it is very difficult to implement the middleware in this way. When I implemented the rap framework, I sacrificed some functions to achieve the middleware processing for WebSocket-like traffic.
            await self.app(scope, receive, send)
            return

        # Generate a request object from scope
        request = Request(scope, receive=receive)
        # Enter the dispatch logic, that is, the user's processing logic
        # The response obtained from this logic is actually generated by the call_next function, and the dispatch function only plays a role of passing.
        response = await self.dispatch_func(request, self.call_next)
        # Return data to the upper layer according to the generated response
        await response(scope, receive, send)

    async def call_next(self, request: Request) -> Response:
        loop = asyncio.get_event_loop()
        # Obtain the next-level message through the queue production and consumption model
        queue: "asyncio.Queue[typing.Optional[Message]]" = asyncio.Queue()

        scope = request.scope
        # Pass the uvicorn's receive object through the request.receive object
        # The receive object used here is still the receive object initialized by uvicorn
        receive = request.receive
        send = queue.put

        async def coro() -> None:
            try:
                await self.app(scope, receive, send)
            finally:
                # This put operation ensures that the get side will not be blocked
                await queue.put(None)

        # Run the next ASGI APP in another coroutine through loop.create_task
        task = loop.create_task(coro())
        # Wait for the return of the next ASGI APP
        message = await queue.get()
        if message is None:
            # If the obtained value is empty, it means that the next ASGI APP has not returned a response, and an error may have occurred.
            # By calling task.result(), if the coroutine has an exception, the error of the coroutine will be thrown.
            task.result()
            # If no exception is thrown, it may be due to user error, such as returning an empty response.
            # At this time, it is impossible to return a response to the client, so an exception needs to be created to facilitate the subsequent generation of a 500 response.
            raise RuntimeError("No response returned.")

        # When ASGI processes the response, it is done in multiple steps. Normally, the above queue.get is the first step to obtain the response.
        assert message["type"] == "http.response.start"

        async def body_stream() -> typing.AsyncGenerator[bytes, None]:
            # Other processing will be handed over to the body_stream function
            # This method simply keeps returning the data stream
            while True:
                message = await queue.get()
                if message is None:
                    break
                assert message["type"] == "http.response.body"
                yield message.get("body", b"")
            task.result()

        # Put the body_stream function into the Response method
        # The response itself is also a class similar to an ASGI APP. According

2.3. ServerErrorMiddleware

ServerErrorMiddleware is quite similar to ExceptionMiddleware (so this part won't be elaborated further). Their overall logic is mostly the same. However, while ExceptionMiddleware is responsible for capturing and handling routing exceptions, ServerErrorMiddleware mainly serves as a fallback measure to ensure that a legitimate HTTP response is always returned.

The indirect call function of ServerErrorMiddleware is the same as that of ExceptionMiddleware. But only when the registered HTTP status code is 500 will the callback be registered in ServerErrorMiddleware:

@app.exception_handlers(500)
def not_found_route() -> None: pass

ServerErrorMiddleware is at the top level of the ASGI APP. It takes on the task of handling fallback exceptions. What it needs to do is simple: if an exception occurs during the processing of the next-level ASGI APP, it will enter the fallback logic:

  • 1. If debug is enabled, return the debug page.
  • 2. If there is a registered callback, execute the registered callback.
  • 3. If neither of the above, return a 500 response.

3. Route

In Starlette, routes are divided into two parts. One part, which I call the Router of the Real App, is at the level below the middleware. It is responsible for almost everything in Starlette except the middleware, mainly including route lookup and matching, application startup and shutdown handling, etc. The other part consists of the routes registered to the Router.

3.1. Router

The Router is straightforward. Its main responsibility is to load and match routes. Here are the source code and comments, excluding the part of loading routes:

class Router:
    def __init__(
        self,
        routes: typing.Sequence[BaseRoute] = None,
        redirect_slashes: bool = True,
        default: ASGIApp = None,
        on_startup: typing.Sequence[typing.Callable] = None,
        on_shutdown: typing.Sequence[typing.Callable] = None,
        lifespan: typing.Callable[[typing.Any], typing.AsyncGenerator] = None,
    ) -> None:
        # Load the information from Starlette initialization
        self.routes = [] if routes is None else list(routes)
        self.redirect_slashes = redirect_slashes
        self.default = self.not_found if default is None else default
        self.on_startup = [] if on_startup is None else list(on_startup)
        self.on_shutdown = [] if on_shutdown is None else list(on_shutdown)

        async def default_lifespan(app: typing.Any) -> typing.AsyncGenerator:
            await self.startup()
            yield
            await self.shutdown()

        # If the initialized lifespan is empty, convert on_startup and on_shutdown to lifespan
        self.lifespan_context = default_lifespan if lifespan is None else lifespan

    async def not_found(self, scope: Scope, receive: Receive, send: Send) -> None:
        """Logic executed when no route is matched"""
        if scope["type"] == "websocket":
            # WebSocket match failed
            websocket_close = WebSocketClose()
            await websocket_close(scope, receive, send)
            return

        # If we're running inside a starlette application then raise an
        # exception, so that the configurable exception handler can deal with
        # returning the response. For plain ASGI apps, just return the response.
        if "app" in scope:
            # In the __call__ method of starlette.applications, we can see that starlette stores itself in scope.
            # After throwing an exception here, it can be captured by ServerErrorMiddleware.
            raise HTTPException(status_code=404)
        else:
            # For calls not from Starlette, directly return an error
            response = PlainTextResponse("Not Found", status_code=404)
        await response(scope, receive, send)

    async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        Handle ASGI lifespan messages, which allows us to manage application
        startup and shutdown events.
        """
        # Lifespan execution logic. When executing, Starlette communicates with the ASGI server. But currently, there may be some functions yet to be developed in this code.
        first = True
        app = scope.get("app")
        await receive()
        try:
            if inspect.isasyncgenfunction(self.lifespan_context):
                async for item in self.lifespan_context(app):
                    assert first, "Lifespan context yielded multiple times."
                    first = False
                    await send({"type": "lifespan.startup.complete"})
                    await receive()
            else:
                for item in self.lifespan_context(app):  # type: ignore
                    assert first, "Lifespan context yielded multiple times."
                    first = False
                    await send({"type": "lifespan.startup.complete"})
                    await receive()
        except BaseException:
            if first:
                exc_text = traceback.format_exc()
                await send({"type": "lifespan.startup.failed", "message": exc_text})
            raise
        else:
            await send({"type": "lifespan.shutdown.complete"})

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        The main entry point to the Router class.
        """
        # The main function for matching and executing routes
        # Currently, only http, websocket, lifespan types are supported
        assert scope["type"] in ("http", "websocket", "lifespan")

        # Initialize the router in scope
        if "router" not in scope:
            scope["router"] = self

        if scope["type"] == "lifespan":
            # Execute lifespan logic
            await self.lifespan(scope, receive, send)
            return

        partial = None

        # Perform route matching
        for route in self.routes:
            match, child_scope = route.matches(scope)
            if match == Match.FULL:
                # If it is a full match (both URL and method match)
                # Then perform normal route processing
                scope.update(child_scope)
                await route.handle(scope, receive, send)
                return
            elif match == Match.PARTIAL and partial is None:
                # If it is a partial match (URL matches, method doesn't match)
                # Then retain the value and continue matching
                partial = route
                partial_scope = child_scope

        if partial is not None:
            # If there is a route with a partial match, also continue execution, but the route will respond with an HTTP method error
            scope.update(partial_scope)
            await partial.handle(scope, receive, send)
            return

        if scope["type"] == "http" and self.redirect_slashes and scope["path"]!= "/":
            # Unmatched situation, judge redirection
            redirect_scope = dict(scope)
            if scope["path"].endswith("/"):
                redirect_scope["path"] = redirect_scope["path"].rstrip("/")
            else:
                redirect_scope["path"] = redirect_scope["path"] + "/"

            for route in self.routes:
                match, child_scope = route.matches(redirect_scope)
                if match!= Match.NONE:
                    # Match again. If the result is not empty, send a redirection response
                    redirect_url = URL(scope=redirect_scope)
                    response = RedirectResponse(url=str(redirect_url))
                    await response(scope, receive, send)
                    return

        # If none of the above processes hit, it means no route is found. At this time, the default route will be executed, and the default default route is 404 not found
        await self.default(scope, receive, send)

It can be seen that the Router code is quite simple. Most of the code is concentrated in the call method. However, there are multiple traversals to query routes, and each route executes a regular expression to judge whether it matches. Some people may think that this execution speed is slow. I used to think so too, and then I implemented a route tree to replace it (see route_trie.py for details). But after a performance test, I found that when the number of routes does not exceed 50, the loop matching performance is better than the route tree. When the number does not exceed 100, the two are equivalent. And under normal circumstances, the number of routes we specify will not exceed 100. So, there is no need to worry about the matching performance of this part of the routes. If you are still worried, you can use Mount to group the routes to reduce the number of matches.

3.2. Other Routes

Mount inherits from BaseRoute, and so do other routes like HostRoute, WebSocketRoute. They provide similar methods, with only slight differences in implementation (mainly in initialization, route matching, and reverse lookup). Let's first look at BaseRoute:

class BaseRoute:
    def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
        # A standard matching function signature. Each Route should return a (Match, Scope) tuple.
        # Match has 3 types:
        #   NONE: No match.
        #   PARTIAL: Partial match (URL matches, method doesn't match).
        #   FULL: Full match (both URL and method match).
        # Scope will basically return the following format, but Mount returns more content:
        #   {"endpoint": self.endpoint, "path_params": path_params}
        raise NotImplementedError()  # pragma: no cover

    def url_path_for(self, name: str, **path_params: str) -> URLPath:
        # Generate a reverse lookup according to the name
        raise NotImplementedError()  # pragma: no cover

    async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
        # The function that can be called after being matched by the Router
        raise NotImplementedError()  # pragma: no cover

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        A route can be used in isolation as a stand-alone ASGI app.
        This is a somewhat contrived case, as they're almost always used
        within a Router, but could be useful for some tooling and minimal apps.
        """
        # If the route is called as a stand-alone ASGI APP, it will perform matching and respond on its own
        match, child_scope = self.matches(scope)
        if match == Match.NONE:
            if scope["type"] == "http":
                response = PlainTextResponse("Not Found", status_code=404)
                await response(scope, receive, send)
            elif scope["type"] == "websocket":
                websocket_close = WebSocketClose()
                await websocket_close(scope, receive, send)
            return

        scope.update(child_scope)
        await self.handle(scope, receive, send)

It can be seen that BaseRoute does not provide many functions, and other routes are extended based on it:

  • Route: A standard HTTP route. It is responsible for route matching through the HTTP URL and HTTP Method, and then provides a method to call the HTTP route.
  • WebSocketRoute: A standard WebSocket route. It matches routes according to the HTTP URL, and then generates a session through the WebSocket in starlette.websocket and passes it to the corresponding function.
  • Mount: A nested encapsulation of routes. Its matching method is prefix matching of the URL, and it forwards requests to the next-level ASGI APP that meets the rules. When its next-level ASGI APP is a Router, the call chain may be like this: Router->Mount->Router->Mount->Router->Route. By using Mount, routes can be grouped, and the matching speed can be increased. It is recommended. Moreover, it can also distribute requests to other ASGI APPs, such as Starlette->ASGI Middleware->Mount->Other Starlette->...
  • Host: It distributes requests according to the Host in the user request to the corresponding ASGI APP, which can be Route, Mount, middleware, etc.

4. Other Components

As can be seen from the above, most components in Starlette are designed as ASGI APPs, which are highly compatible. Although this sacrifices a little performance, the compatibility is extremely strong. Other components are also more or less designed like ASGI APPs. Before introducing other components, let's first look at the overall project structure of Starlette:

ā”œā”€ā”€ middleware                       # Middleware
ā”œā”€ā”€ applications.py                  # Startup application
ā”œā”€ā”€ authentication.py                # Authentication related
ā”œā”€ā”€ background.py                    # Encapsulates background tasks, which will be executed after the response is returned
ā”œā”€ā”€ concurrency.py                   # Some small asyncio-related encapsulations. In the new version, the anyio library is directly used instead.
ā”œā”€ā”€ config.py                        # Configuration
ā”œā”€ā”€ convertors.py                    # Some type conversion methods
ā”œā”€ā”€ datastructures.py                # Some data structures, such as Url, Header, Form, QueryParam, State, etc.
ā”œā”€ā”€ endpoints.py                     # Routes that support cbv and a slightly more advanced WebSocket encapsulation
ā”œā”€ā”€ exceptions.py                    # Exception handling
ā”œā”€ā”€ formparsers.py                   # Parsing of Form, File, etc.
ā”œā”€ā”€ graphql.py                       # Responsible for handling GraphQL related
ā”œā”€ā”€ __init__.py
ā”œā”€ā”€ py.typed                         # Type hints needed by Starlette
ā”œā”€ā”€ requests.py                      # Requests, for users to obtain data
ā”œā”€ā”€ responses.py                     # Responses, responsible for initializing headers and cookies, generating response data according to different Response classes, and then having a class ASGI call interface. This interface will send the ASGI protocol to the Uvicorn service. After sending, if there are background tasks, they will be executed until completion.
ā”œā”€ā”€ routing.py                       # Routing
ā”œā”€ā”€ schemas.py                       # OpenApi related schemas
ā”œā”€ā”€ staticfiles.py                   # Static files
ā”œā”€ā”€ status.py                        # HTTP status codes
ā”œā”€ā”€ templating.py                    # Template responses based on Jinja
ā”œā”€ā”€ testclient.py                    # Test client
ā”œā”€ā”€ types.py                         # Types
ā””ā”€ā”€ websockets.py                    # WebSocket

There are many files above, and some simpler ones will be skipped.

4.1. Request

The Request class is very simple. It inherits from HttpConnection. This class mainly parses the Scope passed by the ASGI protocol to extract information such as the URL and method. And the Request class adds the functions of reading request data and returning data (HTTP 1.1 supports the server pushing data to the client). Among them, reading data depends on a core function - stream. Its source code is as follows:

async def stream(self) -> typing.AsyncGenerator[bytes, None]:
    # If it has been read, get data from the cache
    if hasattr(self, "_body"):
        yield self._body
        yield b""
        return

    if self._stream_consumed:
        raise RuntimeError("Stream consumed")

    self._stream_consumed = True
    while True:
        # Continuously get data from the receive loop of the ASGI container
        message = await self._receive()
        if message["type"] == "http.request":
            body = message.get("body", b"")
            if body:
                # Return the data if it is not empty
                yield body
            if not message.get("more_body", False):
                # It means that all body data has been obtained
                break
        elif message["type"] == "http.disconnect":
            # It means that the connection with the client has been closed
            self._is_disconnected = true
            # Throw an exception. Users calling `await request.body()` or `await request.json()` will throw an exception.
            raise ClientDisconnect()
    # Return empty bytes to mark the end
    yield b""

This implementation is simple, but there is a small bug. Those familiar with Nginx or other web services know that general intermediate servers do not process body data but only pass it on. The same is true for ASGI. After processing the URL and header, Uvicorn starts to call the ASGI APP and passes the send and receive objects down. These two objects will pass through multiple ASGI APPs and reach the route ASGI APP for users to use in functions. So the receive object received by the Request is generated by Uvicorn. And the data source of receive comes from an asyncio.Queue queue. From the analysis of the middleware, each ASGI APP generates a Request object based on scope and receive, which means that the Request objects of each layer of ASGI APP are inconsistent. If the middleware calls the Request object to read the body, it will consume the data in the queue in advance through receive, causing subsequent ASGI APPs to be unable to read body data through the Request object. The sample code for this problem is as follows:

import asyncio
from starlette.applications import Starlette
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse, Response

app: Starlette = Starlette()

class DemoMiddleware(BaseHTTPMiddleware):
    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        print(request, await request.body())
        return await call_next(request)

app.add_middleware(DemoMiddleware)

@app.route("/")
async def demo(request: Request) -> JSONResponse:
    try:
        await asyncio.wait_for(request.body(), 1)
        return JSONResponse({"result": True})
    except asyncio.TimeoutError:
        return JSONResponse({"result": False})

if __name__ == "__main__":
    import uvicorn  # type: ignore
    uvicorn.run(app)

Run it and check the result of the request:

-> curl http://127.0.0.1:8000
{"result":false} 

As shown above, the result is false, which means that the execution of request.body has timed out because the receive queue is already empty and no data can be retrieved. Without a timeout, this request would get stuck.

To solve this problem, let's first look at how Request gets the body. Since the user can get the body multiple times and the data remains the same, the implementation idea is to cache the data after retrieval. We can follow this idea. Since the data is retrieved via receive, we can construct a receive function after reading the data. This function returns data similar to the ASGI communication protocol and has complete body data (meeting the construction of Request.stream for getting the body). The code is as follows:

async def proxy_get_body(request: Request) -> bytes:
    async def receive() -> Message:
        return {"type": "http.request", "body": body}

    body = await request.body()
    request._receive = receive
    return body

Subsequently, if any level of the ASGI APP needs to get the body data, it can call this function to obtain the body data without affecting the subsequent ASGI APPs' ability to get the body data.

5. Summary

By now, we have analyzed several important functional codes of Starlette. Starlette is an excellent library with a great design concept. It is recommended that you read the Starlette source code by yourself, which will be helpful for writing your own frameworks in the future.

Leapcell: The Best Serverless Platform for Python app Hosting

Image description

Finally, I'd like to share the most suitable platform for deploying FastAPI: Leapcell

1. Multi - Language Support

  • Develop with JavaScript, Python, Go, or Rust.

2. Deploy unlimited projects for free

  • pay only for usage ā€” no requests, no charges.

3. Unbeatable Cost Efficiency

  • Pay - as - you - go with no idle charges.
  • Example: $25 supports 6.94M requests at a 60ms average response time.

4. Streamlined Developer Experience

  • Intuitive UI for effortless setup.
  • Fully automated CI/CD pipelines and GitOps integration.
  • Real - time metrics and logging for actionable insights.

5. Effortless Scalability and High Performance

  • Auto - scaling to handle high concurrency with ease.
  • Zero operational overhead ā€” just focus on building.

Image description

Explore more in the documentation!

Leapcell Twitter: https://x.com/LeapcellHQ

Featured ones: