Яндекс.Метрика

    Песочница

    PostgreSQL: Уникальные ключи для распределенной базы. Практика

    По следам статьи Уникальный ключ в условиях распределенной БД.

    У нас есть база которую мы хотим разделить. В идеальном случае хочется сделать master-master. Один из самых сложных моментов, это обеспечение уникальности ключей на всех серверах. И хорошо если база изначально проектировалась с учетом масштабирования… Опять же, это что-то из области идеала, который встречается, скажем так — не часто.

    Итак у нас есть база которую нужно подготовить к синхронизации master-master — сделаем все ключи в нашей базе уникальными в пределах проекта.

    В упомянутой статье рассматривались несколько вариантов, но мы остановимся на одном предложенным Instagram


    Шаг 1 — Перевод всех ключей в bigint



    Здесь подразумевается, что все наши primary ключи называются id, и соответственно поля которые ссылаются на эти ключи названы подобным образом: order_id, client_id, table_id…

    Создадим функцию которая переводит поле integer в bigint
    DROP FUNCTION IF EXISTS "field_int2big" (field text, tablename text, table_schema text);
    CREATE OR REPLACE FUNCTION "field_int2big" (field text, tablename text, table_schema text) RETURNS bigint AS 
    $body$  
      DECLARE 
      BEGIN 
      EXECUTE 'ALTER TABLE '|| table_schema || '."'|| tablename || '" ALTER COLUMN "'|| field || '" TYPE bigint;' ;
      return 1;
      END;  
    $body$  LANGUAGE 'plpgsql';
    


    Дальше выбираем все integer поля и конвертирум их:
    select *, field_int2big(column_name, table_name, table_schema)  from 
    (select table_catalog, table_schema, table_name, column_name, data_type
     from information_schema.columns where table_schema in ('public', 'myscheme') 
     and data_type in ('integer', 'oid') and (position('id' in column_name)>0 OR column_name in ('key', 'myfield'))
    order by table_catalog, table_schema, table_name, column_name 
    limit 10 offset 0) c
    

    Несколько вещей на которые нужно обратить внимание:
    1. Вы можете/должны добавить свои схемы: table_schema in ('public', 'myscheme')
    2. Также можете добавлять свои поля именованные не «стандартно»: column_name in ('key', 'myfield')
    3. Обратите внимание на limit 10 для больших баз таблиц нужно его уменьшать вплоть до 1 — смена типа требует времени и не малого
    4. Запрос нужно запускать несколько раз, каждый раз он будет находить оставшиеся не переведенные поля


    Шаг 2 — Перевод функциональных индексов в которых есть прямое указание типа


    Вообще если этого не сделать — принесет проблем в будущем, их крайне трудно обнаружить: выдает ошибку которую не видно в исполняемом запросе.

    DROP FUNCTION IF EXISTS "index_int2big" (idx text, declare_idx text);
    CREATE OR REPLACE FUNCTION "index_int2big" (idx text, declare_idx text) RETURNS text AS 
    $body$  
      DECLARE 
      new_idx text;
      BEGIN 
      EXECUTE 'DROP INDEX IF EXISTS ' || idx;
      SELECT replace(declare_idx, 'integer', 'bigint') INTO new_idx;
      EXECUTE new_idx ;
      return new_idx;
      END;  
    $body$  LANGUAGE 'plpgsql';
    
    select *, index_int2big(indname, inddef) from 
    (SELECT n.nspname as table_schema, c.relname as table_name, c2.relname AS indname, i.indisprimary, 
    i.indisunique, i.indisclustered, pg_catalog.pg_get_indexdef(i.indexrelid, 0, true) AS inddef
    FROM pg_catalog.pg_class c, pg_catalog.pg_class c2, pg_catalog.pg_index i, pg_namespace n
    WHERE n.oid=c.relnamespace and c.oid = i.indrelid AND i.indexrelid = c2.oid
    and n.nspname in ('bucardo', 'public')
    and position('integer' in pg_catalog.pg_get_indexdef(i.indexrelid, 0, true))>0
    limit 10 offset 0) c
    

    Вы можете заметить что я проверяю в схеме bucardo. Если у вас уже работает синхронизация на основе этой технологии, тогда этот шаг стает крайне важным. Также смотрите замечание с прошлого шага.

    Шаг 3 — Создание новых последовательностей для всех ключевых полей


    В предложенном Instagram варианте используется уникальное число для каждой схемы/сервера.
    т.е. необходимо иметь в каждой схеме свою функцию с своим уникальным номером.
    Я немного изменил функцию и генерирую уникальный ключ по IP сервера.

    CREATE OR REPLACE FUNCTION inet2num(inet) RETURNS numeric AS $$ 
    DECLARE 
        a  text[] := string_to_array(host($1), '.'); 
    BEGIN 
        RETURN a[1]::numeric * 16777216 + 
               a[2]::numeric * 65536 + 
               a[3]::numeric * 256 + 
               a[4]::numeric; 
    END; 
    $$ LANGUAGE plpgsql IMMUTABLE STRICT; 
    
    
    DROP FUNCTION IF EXISTS next_id(tbl text, tableschema text);
    CREATE OR REPLACE FUNCTION next_id(tbl text, tableschema text = 'public') returns bigint AS $$
    DECLARE
        our_epoch bigint := 1314220021721;
        seq_id bigint;
        now_millis bigint;
        shard_id bigint;
        result bigint;
    BEGIN
        SELECT nextval(tableschema||'."' || tbl || '_id_seq"') % 1024 INTO seq_id;
        /* select substring(regexp_replace(md5(current_database()||inet_server_addr()||version()), '[^\\\d]+', '', 'g')::text from 1 for 6)::int into shard_id;*/
        SELECT inet2num(inet_server_addr()) into shard_id;
        SELECT FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * 1000) INTO now_millis;
        result := (now_millis - our_epoch) << 23;
        result := result | (shard_id << 10);
        result := result | (seq_id);
        RETURN result;
    END;
    $$ LANGUAGE PLPGSQL;
    


    Теперь нам осталось вызвать ее на всех наших идентификаторах:
    DROP FUNCTION IF EXISTS "reset_nextid" (tablename text, tableschema text);
    CREATE OR REPLACE FUNCTION "reset_nextid" (tablename text, tableschema text) RETURNS bigint AS 
    $body$  
      DECLARE 
        id_type text;
      BEGIN 
      SELECT data_type from information_schema.columns c where "table_schema"=tableschema and "table_name"=tablename and column_name='id' INTO id_type;
      IF id_type <> 'bigint' THEN
        EXECUTE 'ALTER TABLE '|| tableschema || '."'|| tablename || '" ALTER COLUMN id TYPE bigint;' ;
      END IF;
    
      EXECUTE 'ALTER TABLE '|| tableschema || '."'|| tablename || '" ALTER COLUMN id SET DEFAULT next_id('''|| tablename || ''', '''|| tableschema || ''');';
      return next_id(tablename, tableschema);
      END;  
    $body$  LANGUAGE 'plpgsql';
    
    select t.*, reset_nextid(table_name, table_schema) from (
      select t.table_schema, t.table_name, sequence_name, c.column_name from information_schema.sequences s
      left join information_schema.tables t on split_part(sequence_name, '_id_seq',1)=table_name
      left join information_schema.columns c on (t.table_schema=c.table_schema and t.table_name=c.table_name and c.column_name='id')
      where c.column_name is not null and position('next_id' in c.column_default)<>1 and s.sequence_schema=t.table_schema 
      and t.table_schema in ('public', 'acc') 
      order by t.table_schema, t.table_name limit 50 offset 0
    ) as t
    


    Т.к. мы уже сменили тип идентификатора на bigint — запрос должен отработать быстро.

    На этом подготовку закончили, наша база работает и готова работать параллельно.

    Бонус.

    Если что то пошло не так с next_id, можете вернутся на стандартные последовательности:
    CREATE OR REPLACE FUNCTION "restore_nextval" (tablename text, tableschema text = 'public') RETURNS bigint AS 
    $body$  
      DECLARE 
      BEGIN 
      EXECUTE 'ALTER TABLE '|| tableschema || '."'|| tablename || '" ALTER COLUMN id SET DEFAULT nextval('''|| tablename || '_id_seq''::regclass);';
      return nextval((tableschema || '."'|| tablename || '_id_seq"')::regclass);
      END;  
    $body$  LANGUAGE 'plpgsql';
    
    select t.*, restore_nextval(table_name, table_schema) from (
      select t.table_schema, t.table_name, sequence_name, c.column_name from information_schema.sequences s
      left join information_schema.tables t on split_part(sequence_name, '_id_seq',1)=table_name
      left join information_schema.columns c on (t.table_schema=c.table_schema and t.table_name=c.table_name and c.column_name='id')
      where c.column_name is not null and position('next_id' in c.column_default)>0 and s.sequence_schema=t.table_schema 
      and t.table_schema in ('public', 'acc') 
    


    Спасибо. Надеюсь кому-то было полезно.

    P.S. Не пытайтесь это делать на 32 битном сервере. Вначале обновите сервер. И сервер приложений тоже.