PostgreSQLLa base de données la plus sophistiquée au monde.
Documentation PostgreSQL 10.23 » Programmation serveur » Décodage logique (Logical Decoding) » Plugins de sortie de décodage logique

48.6. Plugins de sortie de décodage logique

Un exemple de plugin de sortie peut être trouvé dans le sous-répertoire contrib/test_decoding de l'arborescence du code source de PostgreSQL.

48.6.1. Fonction d'initialisation

Un plugin de sortie est chargé en chargeant dynamiquement une bibliothèque partagée avec comme nom de base le nom du plugin de sortie. Le chemin de recherche de bibliothèque habituel est utilisé pour localiser cette bibliothèque. Pour fournir les callbacks de plugins de sortie requis et pour indiquer que la bibliothèque est effectivement un plugin de sortie, elle doit fournir une fonction nommée _PG_output_plugin_init. Une structure est passée à cette fonction qui doit la remplir avec les pointeurs des fonctions de callback pour chaque action individuelle.

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
    

Les callbacks begin_cb, change_cb et commit_cb sont obligatoires, alors que startup_cb, filter_by_origin_cb et shutdown_cb sont facultatifs.

48.6.2. Capacités

Pour décoder, formater et sortir les changements, les plugins de sortie peuvent utiliser une grande partie de l'infrastructure habituelle des processus clients, y compris l'appel aux fonctions de sortie. Les accès en lecture seule aux relations est permis du moment que les relations accédées ont été créées par initdb dans le schéma pg_catalog, ou ont été marqués comme tables du catalogue pour l'utilisateur en utilisant :

ALTER TABLE table_catalogue_utilisateur SET (user_catalog_table = true);
CREATE TABLE autre_table_catalogue(data text) WITH (user_catalog_table = true);
    

Toute action amenant à une affectation d'identifiant de transaction est interdite. Cela inclut, entre autres, l'écriture dans des tables, l'exécution de changements DDL et l'appel à txid_current().

48.6.3. Modes de sortie

Les fonctions callbacks des plugins en sortie peuvent renvoyer des données au consommateur dans des formats pratiquement arbitraires. Pour certains cas d'utilisation, comme la visualisation des changements en SQL, le renvoi des données dans un type de données qui peut contenir des données arbitraires (par exemple du bytea) est complexe. Si le plugin en sortie renvoit seulement les données au format texte dans l'encodage du serveur, il peut déclarer cela en configurant OutputPluginOptions.output_type à OUTPUT_PLUGIN_TEXTUAL_OUTPUT au lieu de OUTPUT_PLUGIN_BINARY_OUTPUT dans la fonction callback de démarrage. Dans ce cas, toutes les données doivent être dans l'encodage du serveur pour qu'un champ de type text puisse les contenir. Ceci est vérifié dans les constructions comprenant les assertions.

48.6.4. Callbacks de plugin de sortie

Un plugin de sortie est notifié des changements arrivant au travers de différents callbacks qu'il doit fournir.

Les transactions concurrentes sont décodées dans l'ordre dans lequel elles sont validées, et seuls les changements appartenant à une transaction spécifique sont décodés entre les callbacks begin et commit. Les transactions qui ont été explicitement ou implicitement annulées ne sont jamais décodées. Les savepoints validés sont inclus dans la transaction les contenant, dans l'ordre dans lequel ils ont été effectués dans la transaction.

Note

Seules les transactions qui ont été synchronisées sur disque de manière sûre seront décodées. Cela peut amener à ce qu'un COMMIT ne soit pas immédiatement décodé lors d'un appel à pg_logical_slot_get_changes() juste après celui-ci quand synchronous_commit est positionné à off.

48.6.4.1. Callback de démarrage

Le callback facultatif startup_cb est appelé chaque fois qu'un slot de réplication est créé ou qu'on lui demande de fournir les flux de changement, indépendamment du nombre de changements qui sont prêt à être fournis.

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);
     

Le paramètre is_init sera positioné à true quand le slot de réplication est créé, et à false sinon. options pointe vers une structure d'options que le plugin de sortie peut positionner :

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
} OutputPluginOptions;
     

output_type doit être positionné soit à OUTPUT_PLUGIN_TEXTUAL_OUTPUT ou à OUTPUT_PLUGIN_BINARY_OUTPUT. Voir aussi Section 48.6.3.

Le callback de démarrage devrait valider les options présentes dans ctx->output_plugin_options. Si le plugin de sortie a besoin d'avoir un état, il peut utiliser ctx->output_plugin_private pour le stocker.

48.6.4.2. Callback d'arrêt

Le callback facultatif shutdown_cb est appelé chaque fois qu'un slot de réplication anciennement actif n'est plus utilisé et peut être utilisé pour désallouer les ressources privées du plugin de sortie. Le slot n'est pas nécessairement supprimé, le flux est juste arrêté.

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
     

48.6.4.3. Callback de début de transaction

Le callback obligatoire begin_cb est appelé chaque fois que le début d'une transaction validée a été décodé. Les transactions annulées et leur contenu ne sont pas décodés.

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);
     

Le paramètre txn contient des métadonnées sur la transaction, comme l'heure à laquelle elle a été validée et son XID.

48.6.4.4. Callback de fin de transaction

Le callback obligatoire commit_cb est appelé chaque fois qu'une transaction validée a été décodée. Le callback change_cb aura été appelé avant cela pour chacune des lignes modifiées, s'il y en a eu.

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);
     

48.6.4.5. Callback de modification

Le callback obligatoire change_cb est appelé pour chacune des modifications de ligne au sein d'une transaction, qu'il s'agisse d'un INSERT, UPDATE ou DELETE. Même si la commande d'origine a modifié plusieurs ligne en une seule instruction, le callback sera appelé pour chaque ligne individuellement.

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);
     

Les paramètres ctx et txn ont le même contenu que pour les callbacks begin_cb et commit_cb, mais en plus le descripteur de relation relation pointe vers la relation à laquelle appartient la ligne et une structure change décrivant les modifications de ligne y est passée.

Note

Seules les changements dans les tables définies par les utilisateurs qui sont journalisées (voir UNLOGGED) et non temporaires (voir TEMPORARY ou TEMP) peuvent être extraite avec le décodage logique.

48.6.4.6. Fonction de filtre sur l'origine

La fonction optionnelle filter_by_origin_cb est appelée pour déterminer si les données rejouées à partir de origin_id ont un intérêt pour le plugin de sortie.

typedef bool (*LogicalDecodeFilterByOriginCB) (
    struct LogicalDecodingContext *ctx,
    RepNodeId origin_id
);
     

Le paramètre ctx a le même contenu que pour les autres fonctions. Aucune information mais l'originie est disponible. Pour signaler que les changements provenant du nœud sont hors de propos, elle renvoie true, ce qui permet de les filtrer. Elle renvoie false dans les autres cas. Les autres fonctions ne seront pas appelées pour les transactions et changements qui ont été filtrées.

Ceci est utile pour implémenter des solutions de réplication en cascade ou des solutions de réplication multi-directionnelles. Filtrer par rapport à l'origine perment d'empêcher la réplication dans les deux sesns des mêmes modifications dans ce type de configuration. Quand les transactions et les modifications contiennent aussi des informations sur l'origine, le filtre via cette fonction est beaucoup plus efficace.

48.6.4.7. Fonctions personnalisées de message générique

La fonction (callback) message_cb est appelée quand un message de décodage logique a été décodé.

typedef void (*LogicalDecodeMessageCB) (
    struct LogicalDecodingContext *,
    ReorderBufferTXN *txn,
    XLogRecPtr message_lsn,
    bool transactional,
    const char *prefix,
    Size message_size,
    const char *message
);
     

Le paramètre txn contient des méta-informations sur la transaction, comme l'horodatage à laquelle la transaction a été validée et son identifiant (XID). Notez néanmoins qu'il peut être NULL quand le message n'est pas transactionnel et que le XID n'a pas encore été affecté dans la transaction qui a tracé le message. Le lsn a la position du message dans les WAL. Le paramètre transactional indique si le message a été envoyé de façon transactionnelle ou non. Le paramètre prefix est un préfix arbitraire terminé par un un caractère nul qui peut être utilisé pour identifier les messages intéressants pour le plugin courant. Et enfin, le paramètre message détient le message réel de taille message_size.

Une attention particulière doit être portée à l'unicité du préfixe que le plugin de sortie trouve intéressant. Utiliser le nom de l'extension ou du plugin de sortie est souvent un bon choix.

48.6.5. Fonction pour produire une sortie

Pour pouvoir produire une sortie, les plugins de sortie peuvent écrire des données dans le tampon de sortie StringInfo dans ctx->out dans les callbacks begin_cb, commit_cb ou change_cb. Avant d'écrire dans le tampon de sortie, OutputPluginWrite(ctx, last_write) doit avoir été appelé pour effectuer l'écriture. last_write indique si une écriture particuli_re était la dernière écriture du callback.

L'exemple suivant montre comment sortir des données pour le consommateur d'un plugin de sortie :

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);