从MongoDB迁移到PostgreSQL

in DB with 0 comment

原因

之前写的一个项目,由于数据都是json,就直接使用了MongoDB进行数据存储,但是由于读写较为频繁,MongoDB较低的性能就不符合需求了,遂有了迁移的念头。

迁移的是一个扁平结构的collection,选择PostgreSQL是因为有jsonb的支持,部分字段我还是有json需求的。

前期准备

将需要的字段都整理出来,然后在PostgreSQL里建好库。

实施过程

使用mongodbexport导出数据为csv

mongoexport -d wzry -c collection_name --type csv --fields "field1,field2"  --out out.csv

在PostgreSQL命令行导入csv

COPY table_name 
FROM '/out.csv'
DELIMITER ','
CSV HEADER;

这里要注意一个坑,尽量不要使用图形化管理软件,navicat我显示不出创建好的表,datagrip有个从文件导入的功能,理论上应该是可以使用,但是不知道为什么不识别jsonb,说转换失败,但实际上在命令行可以轻松导入。

这里还会对数据进行检查,比如应该是bigint,但是表结构只有int,会报错。

到这数据导入就结束了,非常轻松,300万条数据也很快。

代码修改

python代码改了我一天时间。。。golang还没改。。。这就只说python了

首先自然是driver库的选择,最正统的库自然是psycopg2,要orm的话SQLAlchemy也是可以。但是我需要一个异步的库,就看到了asyncpg

性能对比
这张图还是蛮有吸引力的

不过这就只能手写sql了

其实查询都好说,真正的难点出现在如何实现MongoDB里的Upsert功能,也就是PostgreSQL如何实现insert or update,实际上这个关键词在google能搜索到一大堆,官方文档也有说明,就是利用 ON CONFLICT

但是!有如下还需解决:

解决问题

---写到这突然懒了

任意字段

编写函数,for循环很方便

高性能批量提交

可以参考stackoverflow

asyncpg提供了executemany,但是目测是跟psycopg2的一个样,只是多条语句多次提交,性能堪忧,所以需要自行优化,将多条语句合并成一条。values (record1....),(record2....),(record3....)

并且的话为了要输入数据,就是构建安全的语句,我们不能像conn.execute(sql, value1, value2)这样子做,于是找了找半天终于找到psycopg2的cursor.mogrify可以构建语句而不执行。不过因为是cursor游标类的函数,需要初始化,也就是需要一个连接(实际上用不到),怎么避免,其实可以自己hack一下,也挺有意思的。

inserted count and modified count

可以参考stackoverflow

然后结合以上几点配合大量循环就可以编写一个通用的函数完成upsert语句的构建

这是我写的一个没那么通用的函数,部分写死了,有一些内容没贴出来,所以直接copy没法用,但是如果有编程能力的应该都能够看得懂自己修改了,欢迎留言

def test(data):
    fields = tuple(i[0].data.keys())
    _sql = f'''WITH t AS  (INSERT INTO public.collection_name ({'"' + '","'.join(fields) + '"'})
        VALUES %s
        ON CONFLICT ("Id") DO UPDATE SET {','.join(['"' + _x + '" = EXCLUDED."' + _x + '"' for _x in fields if _x != 'Id'])} RETURNING xmax)
        SELECT COUNT(*) AS rows,
        SUM(CASE WHEN xmax = 0 THEN 1 ELSE 0 END) AS inserted,
        SUM(CASE WHEN xmax::text::int > 0 THEN 1 ELSE 0 END) AS updated
        FROM t;''' % data_many(data, keys=fields)
    res = await conn.fetchrow(_sql)
    result = BulkResult(result.rows + (res['rows'] or 0),
        result.inserted + (res['inserted'] or 0),
        result.updated + (res['updated'] or 0))
def data_many(ops, keys=None, jsonb=False):
    return ','.join(tuple(data_format(x, keys=keys, jsonb=jsonb) for x in ops))

def data_format(args, keys, jsonb=False):
    if keys:
        args = tuple(args.data[x] for x in keys)
    try:
        return __cur.mogrify(f"({','.join(tuple('%s' if not jsonb or not isinstance(args[_], Json) else '%s::jsonb' for _ in range(len(args))))})", args).decode('utf-8')
    except psycopg2.ProgrammingError as e:
        print(args)
        raise e

批量update的问题

可以参考stackoverflow

批量update会面临一个问题就是因为需要构建这么一个SQL

update test as t set
    column_a = c.column_a
from (values
    ('123', 1),
    ('345', 2)  
) as c(column_b, column_a) 
where c.column_b = t.column_b;

其实如果前面的问题都自己解决,那么这个问题本来也不应该有。但是这下问题出现jsonb上。

如果按照之前的方式来进行语句构建报错

asyncpg.exceptions.DatatypeMismatchError: column "xxx" is of type jsonb but expression is of type text

其实如果是UPDATE TABLE SET xxx='[1,2]' WHERE 1,是没有问题的。

但是如果在values里将jsonb数据用'[1,2]'方式,就会报错。

这个问题困扰了好一段时间,终于找到是要添加::jsonb标识,方案已经包含在上方代码了。

写在后面

事实上迁移过程踩坑主要也有第一次使用PostgreSQL的原因,但是这么一弄,算是对PostgreSQL有了比较多的了解。

而且迁移带来的效益也是很高的。MongoDB实在不适合在高读写频次的场景使用。

Responses