Un exemple de plugin de sortie peut être trouvé dans le sous-répertoire
contrib/test_decoding
de l'arborescence du code source de PostgreSQL.
Un plugin de sortie est chargé en chargeant dynamiquement une
bibliothèque partagée avec comme nom de base le nom du plugin de sortie.
Le chemin de recherche de bibliothèque habituel est utilisé pour
localiser cette bibliothèque. Pour fournir les callbacks de plugins de
sortie requis et pour indiquer que la bibliothèque est effectivement un
plugin de sortie, elle doit fournir une fonction nommée
_PG_output_plugin_init
. Une structure est passée à cette
fonction qui doit la remplir avec les pointeurs des fonctions de callback
pour chaque action individuelle.
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
Les callbacks begin_cb
, change_cb
et commit_cb
sont obligatoires,
alors que startup_cb
,
filter_by_origin_cb
, truncate_cb
et shutdown_cb
sont facultatifs. Si
truncate_cb
n'est pas configuré mais que
TRUNCATE
doit être décodé, l'action sera ignoré.
Pour décoder, formater et sortir les changements, les plugins de sortie
peuvent utiliser une grande partie de l'infrastructure habituelle des
processus clients, y compris l'appel aux fonctions de sortie. Les accès
en lecture seule aux relations est permis du moment que les relations
accédées ont été créées par initdb
dans le schéma
pg_catalog
, ou ont été marqués comme tables du
catalogue pour l'utilisateur en utilisant :
ALTER TABLE table_catalogue_utilisateur SET (user_catalog_table = true); CREATE TABLE autre_table_catalogue(data text) WITH (user_catalog_table = true);
Toute action amenant à une affectation d'identifiant de transaction est
interdite. Cela inclut, entre autres, l'écriture dans des tables,
l'exécution de changements DDL et l'appel à txid_current()
.
Les fonctions callbacks des plugins en sortie peuvent renvoyer des données
au consommateur dans des formats pratiquement arbitraires. Pour certains
cas d'utilisation, comme la visualisation des changements en SQL, le
renvoi des données dans un type de données qui peut contenir des données
arbitraires (par exemple du bytea
) est complexe. Si le plugin en sortie
renvoit seulement les données au format texte dans l'encodage du serveur,
il peut déclarer cela en configurant
OutputPluginOptions.output_type
à
OUTPUT_PLUGIN_TEXTUAL_OUTPUT
au lieu de
OUTPUT_PLUGIN_BINARY_OUTPUT
dans la
fonction callback de
démarrage. Dans ce cas, toutes les données doivent être dans
l'encodage du serveur pour qu'un champ de type text
puisse les
contenir. Ceci est vérifié dans les constructions comprenant les assertions.
Un plugin de sortie est notifié des changements arrivant au travers de différents callbacks qu'il doit fournir.
Les transactions concurrentes sont décodées dans l'ordre dans lequel
elles sont validées, et seuls les changements appartenant à une
transaction spécifique sont décodés entre les callbacks
begin
et commit
. Les transactions
qui ont été explicitement ou implicitement annulées ne sont jamais
décodées. Les savepoints validés
sont inclus dans la transaction les contenant, dans l'ordre dans lequel
ils ont été effectués dans la transaction.
Seules les transactions qui ont été synchronisées sur disque de manière
sûre seront décodées. Cela peut amener à ce qu'un COMMIT
ne soit pas
immédiatement décodé lors d'un appel à
pg_logical_slot_get_changes()
juste après celui-ci
quand synchronous_commit
est positionné à
off
.
Le callback facultatif startup_cb
est appelé chaque
fois qu'un slot de réplication est créé ou qu'on lui demande de fournir
les flux de changement, indépendamment du nombre de changements qui sont
prêt à être fournis.
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init);
Le paramètre is_init
sera positioné à true quand le
slot de réplication est créé, et à false sinon.
options
pointe vers une structure d'options
que le plugin de sortie peut positionner :
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; } OutputPluginOptions;
output_type
doit être positionné soit à
OUTPUT_PLUGIN_TEXTUAL_OUTPUT
ou à
OUTPUT_PLUGIN_BINARY_OUTPUT
. Voir aussi Section 49.6.3.
Si receive_rewrites
vaut true, le plugin de sortie
sera aussi appelé pour les modifications réalisées par des réécritures du
fichier HEAP lors de certaines opérations DDL. Ceci est intéressant pour
les plugins qui gèrent la réplication DDL mais ils nécessitent une
gestion particulière.
Le callback de démarrage devrait valider les options présentes dans
ctx->output_plugin_options
. Si le plugin de
sortie a besoin d'avoir un état, il peut utiliser
ctx->output_plugin_private
pour le stocker.
Le callback facultatif shutdown_cb
est appelé
chaque fois qu'un slot de réplication anciennement actif n'est plus
utilisé et peut être utilisé pour désallouer les ressources privées
du plugin de sortie. Le slot n'est pas nécessairement supprimé, le
flux est juste arrêté.
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
Le callback obligatoire begin_cb
est appelé chaque
fois que le début d'une transaction validée a été décodé. Les
transactions annulées et leur contenu ne sont pas décodés.
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
Le paramètre txn
contient des métadonnées sur la
transaction, comme l'heure à laquelle elle a été validée et son XID.
Le callback obligatoire commit_cb
est appelé chaque
fois qu'une transaction validée a été décodée. Le callback
change_cb
aura été appelé avant cela pour chacune
des lignes modifiées, s'il y en a eu.
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
Le callback obligatoire change_cb
est appelé pour
chacune des modifications de ligne au sein d'une transaction, qu'il
s'agisse d'un INSERT
, UPDATE
ou DELETE
. Même si la commande d'origine a modifié
plusieurs ligne en une seule instruction, le callback sera appelé
pour chaque ligne individuellement.
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
Les paramètres ctx
et txn
ont le même contenu que pour les callbacks begin_cb
et commit_cb
, mais en plus le descripteur de relation
relation
pointe vers la relation à laquelle
appartient la ligne et une structure change
décrivant les modifications de ligne y est passée.
Seules les changements dans les tables définies par les utilisateurs qui
sont journalisées (voir UNLOGGED
)
et non temporaires
(voir TEMPORARY
ou TEMP
) peuvent être extraite
avec le décodage logique.
La fonction callback truncate_cb
est appelée pour la
commande TRUNCATE
.
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
Les paramètres sont identiques à ceux du callback
change_cb
. Néanmoins, comme les actions du
TRUNCATE
sur les tables liées par clés étrangères
doivent être exécutées ensembles, ce callback reçoit un tableau de
relations au lieu d'une seule relation. Voir la description de
l'instruction TRUNCATE pour les détails.
La fonction optionnelle filter_by_origin_cb
est
appelée pour déterminer si les données rejouées à partir de
origin_id
ont un intérêt pour le plugin de
sortie.
typedef bool (*LogicalDecodeFilterByOriginCB) ( struct LogicalDecodingContext *ctx, RepNodeId origin_id );
Le paramètre ctx
a le même contenu que pour les
autres fonctions. Aucune information mais l'originie est disponible. Pour
signaler que les changements provenant du nœud sont hors de propos, elle
renvoie true, ce qui permet de les filtrer. Elle renvoie false dans les
autres cas. Les autres fonctions ne seront pas appelées pour les
transactions et changements qui ont été filtrées.
Ceci est utile pour implémenter des solutions de réplication en cascade ou des solutions de réplication multi-directionnelles. Filtrer par rapport à l'origine perment d'empêcher la réplication dans les deux sesns des mêmes modifications dans ce type de configuration. Quand les transactions et les modifications contiennent aussi des informations sur l'origine, le filtre via cette fonction est beaucoup plus efficace.
La fonction (callback) message_cb
est appelée quand
un message de décodage logique a été décodé.
typedef void (*LogicalDecodeMessageCB) ( struct LogicalDecodingContext *, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message );
Le paramètre txn
contient des méta-informations sur
la transaction, comme l'horodatage à laquelle la transaction a été validée
et son identifiant (XID). Notez néanmoins qu'il peut être NULL quand le
message n'est pas transactionnel et que le XID n'a pas encore été affecté
dans la transaction qui a tracé le message. Le lsn
a la position du message dans les WAL. Le paramètre
transactional
indique si le message a été envoyé
de façon transactionnelle ou non. Le paramètre
prefix
est un préfix arbitraire terminé par un
un caractère nul qui peut être utilisé pour identifier les messages
intéressants pour le plugin courant. Et enfin, le paramètre
message
détient le message réel de taille
message_size
.
Une attention particulière doit être portée à l'unicité du préfixe que le plugin de sortie trouve intéressant. Utiliser le nom de l'extension ou du plugin de sortie est souvent un bon choix.
Pour pouvoir produire une sortie, les plugins de sortie
peuvent écrire des données dans le tampon de sortie StringInfo
dans ctx->out
dans les callbacks
begin_cb
, commit_cb
ou change_cb
. Avant d'écrire dans le tampon de
sortie, OutputPluginWrite(ctx, last_write)
doit avoir
été appelé pour effectuer l'écriture. last_write
indique si une écriture particuli_re était la dernière écriture du
callback.
L'exemple suivant montre comment sortir des données pour le consommateur d'un plugin de sortie :
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);