Skip to content

DataLoader

DataLoader is a very important component in pydantic-resolve, utilizing a third-party independent library aiodataloader. It is often used as a dependency in GraphQL-related libraries.

It can solve the N+1 query problem in GraphQL by combining multiple concurrent queries into a single batch query to optimize performance.

The internal mechanism of pydantic-resolve is somewhat similar to GraphQL, so it can be directly used to handle data loading. For some simple DataLoaders, reuse with other Python GraphQL frameworks can also be achieved.

For example, the code introduced in the introduction, when batch processing get_cars_by_child, multiple triggers occur, causing the N+1 query problem. We can solve this with DataLoader.

class Child(BaseModel):
    id: int
    name: str

    cars: List[Car] = []
    # async def resolve_cars(self):
    #     return await get_cars_by_child(self.id)

    cars: List[Car] = []
    async def resolve_cars(self, loader=LoaderDepend(CarLoader)):
        return await loader.load(self.id)

    description: str = ''
    def post_description(self):
        desc = ', '.join([c.name for c in self.cars])
        return f'{self.name} owns {len(self.cars)} cars, they are: {desc}'

children = await Resolver.resolve([
        Child(id=1, name="Titan"), Child(id=1, name="Siri")])

Sample

erDiagram
    User ||--o{ Blog : owns
    Blog ||--o{ Comment : owns
    User {
        int id
        string name
    }
    Blog {
        int id
        int user_id
        string title
        string content
    }
    Comment {
        int id
        int blog_id
        int user_id
        string content
    }
import asyncio
import json
from typing import Optional
from pydantic import BaseModel
from pydantic_resolve import Resolver, build_object, build_list, LoaderDepend
from aiodataloader import DataLoader

# Schema/ Entity
class Comment(BaseModel):
    id: int
    content: str
    user_id: int

class Blog(BaseModel):
    id: int
    title: str
    content: str

class User(BaseModel):
    id: int
    name: str


# Loaders/ relationships
class CommentLoader(DataLoader):
    async def batch_load_fn(self, comment_ids):
        comments = [
            dict(id=1, content="world is beautiful", blog_id=1, user_id=1),
            dict(id=2, content="Mars is beautiful", blog_id=2, user_id=2),
            dict(id=3, content="I love Mars", blog_id=2, user_id=3),
        ]
        return build_list(comments, comment_ids, lambda c: c['blog_id'])

class UserLoader(DataLoader):
    async def batch_load_fn(self, user_ids):
        users = [ dict(id=1, name="Alice"), dict(id=2, name="Bob"), ]
        return build_object(users, user_ids, lambda u: u['id'])


# Compose schemas and dataloaders together
class CommentWithUser(Comment):
    user: Optional[User] = None
    def resolve_user(self, loader=LoaderDepend(UserLoader)):
        return loader.load(self.user_id)

class BlogWithComments(Blog):
    comments: list[CommentWithUser] = []
    def resolve_comments(self, loader=LoaderDepend(CommentLoader)):
        return loader.load(self.id)


# Run
async def main():
    raw_blogs =[
        dict(id=1, title="hello world", content="hello world detail"),
        dict(id=2, title="hello Mars", content="hello Mars detail"),
    ]
    blogs = await Resolver().resolve([BlogWithComments.parse_obj(b) for b in raw_blogs])
    print(json.dumps(blogs, indent=2, default=lambda o: o.dict()))

asyncio.run(main())

Note: The purpose of this example is to demonstrate the ability to fetch related data. People often ask how to handle pagination. Generally, pagination is used when there is a large amount of root data, such as a paginated User list.

If you want to paginate Blogs, one way is to treat Blogs as the root data entry and paginate using api/blog?limit=10&offset=0.

Alternatively, you can use a single User and paginate the blogs with the context parameter. In this case, you cannot use DataLoader and need to replace it with a single method like get_blogs_by_user_id_with_pagination.

Additionally, DataLoader can add range limits, such as fetching a batch of users' blogs from the last N days or comments from the last M days. This approach aligns with the data volume displayed in the UI.

Creating a DataLoader

There are two ways to create a DataLoader object. One is by inheritance:

from aiodataloader import DataLoader

class UserLoader(DataLoader):
    max_batch_size = 20
    async def batch_load_fn(self, keys):
        return await my_batch_get_users(keys)

user_loader = UserLoader()

The inheritance method allows setting some aiodataloader related parameters, such as using max_batch_size = 20 to slice and parallelize the keys.

These parameters are reserved parameter names for aiodataloader, so be careful not to use the same names when adding new parameters.

The other way is to directly create an async def batch_load_fn(keys) method, and pydantic-resolve will use DataLoader(batch_load_fn) to create an instance internally.

Generally, it is recommended to use the first method, and the reasons will be explained later.

Next, we will introduce a series of features provided by pydantic-resolve based on DataLoader:

Multi-layer Data Stitching

Take Company, Office, and Member as examples. Office and Member have dataloaders that return their data, as long as the corresponding office_id and member_id are provided.

  • OfficeLoader gets the corresponding Office list through company_id
  • MemberLoader gets the Member list through office_id

Taking OfficeLoader as an example, passing in company_ids returns an array of offices obtained in the order of company_ids.

For example, input company_ids = [1, 2, 3], return [[office_1, office_2], [office_3], [office_4]], which means company 1 has [office_1, office_2], and so on.

build_list is a helper method provided by pydantic-resolve to simplify the assembly process

class OfficeLoader(DataLoader)
    async def batch_load_fn(self, company_ids):
        offices = await get_offices_by_company_ids(company_ids)
        return build_list(offices, company_ids, lambda x: x['company_id'])  # Assuming the data is a dict

class Company(BaseModel):
    id: int
    name: str

    offices: List[Office] = []
    def resolve_offices(self, loader=LoaderDepend(OfficeLoader)):
        return loader.load(self.id)  # Returns a Future

class Office(BaseModel):
    id: int
    company_id: int
    name: str

    members: List[Member] = []
    def resolve_members(self, loader=LoaderDepend(MemberLoader)):
        return loader.load(self.id)  # Returns a Future

class Member(BaseModel):
    id: int
    office_id: int
    name: str

companies = [
    Company(id=1, name='Aston'),
    Compay(id=2, name="Nicc"),
    Company(id=3, name="Carxx")
]
companies = await Resolver().resolve(companies)

In this code, Company extends the offices data, and then Office extends the members data. During the entire process, the batch_load_fn in OfficeLoader and MemberLoader only executes once, regardless of the number of Offices and members.

Of course, if max_batch_size is relatively small, multiple batch_load_fn may be executed concurrently

Providing Parameters and Cloning

DataLoader can add parameters, but be careful to avoid conflicts with the default parameters in aiodataloader: batch, max_batch_size, cache, cache_key_fn, cache_map.

For example, add a status parameter to OfficeLoader to filter open offices.

class OfficeLoader(DataLoader)
    status: Literal['open', 'closed', 'inactive']

    async def batch_load_fn(self, company_ids):
        offices = await get_offices_by_company_ids_by_status(company_ids, self.status)
        return build_list(offices, company_ids, lambda x: x['company_id'])

The defined parameters can be set in the Resolver through loader_params.

companies = [
    Company(id=1, name='Aston'),
    Compay(id=2, name="Nicc"),
    Company(id=3, name="Carxx")
]
companies = await Resolver(
    loader_params={
        OfficeLoader: {
            'status': 'open'
        }
    }
).resolve(companies)

There is still a problem here. What if you need to get both open and closed data at the same time?

class Company(BaseModel):
    id: int
    name: str

    open_offices: List[Office] = []
    def resolve_open_offices(self, loader=LoaderDepend(OfficeLoader)):
        return loader.load(self.id)

    closed_offices: List[Office] = []
    def resolve_closed_offices(self, loader=LoaderDepend(OfficeLoader)):
        return loader.load(self.id)

Different conditions for the data cannot be provided by a single OfficeLoader. pydantic-resolve provides a tool to clone two OfficeLoaders.

from pydantic_resolve import copy_dataloader_kls

OfficeLoader1 = copy_dataloader_kls('OfficeLoader1', OfficeLoader)
OfficeLoader2 = copy_dataloader_kls('OfficeLoader2', OfficeLoader)

class Company(BaseModel):
    id: int
    name: str

    open_offices: List[Office] = []
    def resolve_open_offices(self, loader=LoaderDepend(OfficeLoader1)):
        return loader.load(self.id)

    closed_offices: List[Office] = []
    def resolve_closed_offices(self, loader=LoaderDepend(OfficeLoader2)):
        return loader.load(self.id)

companies = [
    Company(id=1, name='Aston'),
    Company(id=2, name="Nicc"),
    Company(id=3, name="Carxx")
]
companies = await Resolver(
    loader_params={
        OfficeLoader1: {
            'status': 'open'
        }
        OfficeLoader2: {
            'status': 'closed'
        }
    }
).resolve(companies)

Handling DataLoader Return Values

If you need to process the return value of loader.load(self.id), you can change resolve_offices to async.

class Company(BaseModel):
    id: int
    name: str

    offices: List[Office] = []
    async def resolve_offices(self, loader=LoaderDepend(OfficeLoader)):
        offices = await loader.load(self.id)
        return [of for of in offices if of['status'] == 'open']

In addition, pydantic-resolve does not restrict the name or number of loaders, so the following usage is also allowed. The example given is a bit strange though.

class Company(BaseModel):
    id: int
    name: str

    offices: List[Office] = []
    async def resolve_offices(
            self,
            office_loader=LoaderDepend(OfficeLoader),
            manager_loader=LoaderDepend(ManagerLoader)):
        offices = await office_loader.load(self.id)
        managers = await manager_loader.load(self.id)

        offices = [of for of in offices if of['manager'] in managers]
        return offices

Pre-generating DataLoader Instances

You can generate DataLoader instances in advance and set the data in advance, so you can use the cache mechanism of DataLoader to avoid query overhead during the resolve() phase.

This can be helpful in some specific scenarios.

loader = SomeLoader()
loader.prime('tangkikodo', ['tom', 'jerry'])
loader.prime('john', ['mike', 'wallace'])
data = await Resolver(loader_instances={SomeLoader: loader}).resolve(data)

Viewing DataLoader Instance Data

If you want to know which instances DataLoader has initialized and what data it has obtained, you can print it out to check.

resolver = Resolver()
data = await resolver.resolve(data)
print(resolver.loader_instance_cache)