databases 连接 pg 查询时返回的数据类型

Python 小记 2020-10-06 3334 字 1318 浏览 点赞

使用 databases 做异步查询的时候,sqlite、mysql 查询结果返回的都是元组类型,但是连接 pg 数据库的时候返回的是 Record 对象实例。

database = Database(
    'postgresql://postgres:password@192.168.111.136:54321/postgres', 
    min_size=5, 
    max_size=20
)
await database.connect()
query = "SELECT score, name, id FROM HighScores"
rows = await database.fetch_all(query=query)  # 查询
print(rows)  # 打印结果
await database.disconnect()

输出:

[<databases.backends.postgres.Record object at 0x000001BADB1E2A08>]

乍看就觉得麻烦了,毕竟连接不同的数据库时查询会返回不同的结果,大大降低了代码的灵活性,还要引入 if 做类型判断。

但是点进 databases 源码可以看到 Record 的代码:

# database/backends/postgres.py

class Record(Mapping):
    def __init__(
        self, row: asyncpg.Record, result_columns: tuple, dialect: Dialect
    ) -> None:
        ...

    def values(self) -> typing.ValuesView:
        return self._row.values()

    def __iter__(self) -> typing.Iterator:
        return iter(self._row.keys())

    def __len__(self) -> int:
        return len(self._row)

可以发现,Record 类接收了 row 参数,而这个参数的类型是 asyncpg.Record。由于 asyncpg.Record 代码是 C 语言实现,去分析 C 代码就麻烦了。但是查询 asyncpg 官方文档,得到一个惊喜:

Each row (or composite type value) returned by calls to fetch* methods is represented by an instance of the Record object. Record objects are a tuple-/dict-like hybrid, and allow addressing of items either by a numeric index or by a field name.

asyncpg.Record 本来就可以是元组,事儿好办了许多。


第一种方案粗暴一些,既然 mysql、sqlite 返回的就是元组,那么 pg 也得返回元组。

import asyncpg
import databases.backends.postgres
from sqlalchemy.engine.interfaces import Dialect

class RecordMeta(type):
    def __call__(cls, row: asyncpg.Record, result_columns: tuple, dialect: Dialect):
        return tuple(row)  # https://magicstack.github.io/asyncpg/current/api/index.html?highlight=record#record-objects

class Record(metaclass=RecordMeta):
    def __new__(cls, *args, **kwargs):
        pass

databases.backends.postgres.Record = Record  # 补丁


import asyncio

async def main():
    database = Database('postgresql://postgres:password@192.168.111.136:5432/postgres', min_size=5, max_size=20)
    await database.connect()

    query = "SELECT score, name, id FROM HighScores"
    rows = await database.fetch_all(query=query)
    print(rows)  # 输出:[(92, 'Daisy', 1), (87, 'Neil', 2), (43, 'Carol', 3)]

    await database.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

第二种方案。看 database.backends.postgres.Record 源码就知道,这个类有许多实例属性,留下来或许有别的用处,不能一竿子打翻一船人。

此时我们可以关注 Record 下的 __iter__ 方法。

class Record(Mapping):
    ...

    def __iter__(self) -> typing.Iterator:
        return iter(self._row.keys())

就是说,我们对 Record 实例用 tuple 方法时,返回的是数据的列名,用处不大。让它返回数据的值就好。

import databases.backends.postgres

databases.backends.postgres.Record.__iter__ = lambda self: iter(self._row)  # 补丁


import asyncio

async def main():
    ...
    rows = await database.fetch_all(query=query)
    print(rows)  # 输出:[<databases.backends.postgres.Record object at 0x0000024470402B88>]
    print([tuple(r) for r in rows])  # 输出:[(92, 'Daisy', 1)]
    await database.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

由于 mysql、sqlite 返回的就是元组,而对元组使用 tuple 函数返回本身,对查询结果不会有任何改变。可以统一作此处理。



本文由 Guan 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论