postgresql/lightdb的notify机制--可靠缓存、MQ消息事务的救星
2022/9/11 2:23:20
本文主要是介绍postgresql/lightdb的notify机制--可靠缓存、MQ消息事务的救星,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
http://www.light-pg.com/docs/lightdb/13.3-22.2/sql-notify.html
http://www.light-pg.com/docs/lightdb/13.3-22.2/sql-listen.html
https://wiki.postgresql.org/wiki/PgNotificationHelper
https://jdbc.postgresql.org/documentation/head/listennotify.html
https://tapoueh.org/blog/2018/07/postgresql-listen-notify/
在写入clog/xact前那一刻,内核会将通知加入队列。如下:
xact.c
static void CommitTransaction(void) { ...... /* * Insert notifications sent by NOTIFY commands into the queue. This * should be late in the pre-commit sequence to minimize time spent * holding the notify-insertion lock. However, this could result in * creating a snapshot, so we must do it before serializable cleanup. */ PreCommit_Notify(); ......
asyc.c负责notify相关的实现:
/* * PreCommit_Notify * * This is called at transaction commit, before actually committing to * clog. * * If there are pending LISTEN actions, make sure we are listed in the * shared-memory listener array. This must happen before commit to * ensure we don't miss any notifies from transactions that commit * just after ours. * * If there are outbound notify requests in the pendingNotifies list, * add them to the global queue. We do that before commit so that * we can still throw error if we run out of queue space. */ void PreCommit_Notify(void) { ListCell *p; if (!pendingActions && !pendingNotifies) return; /* no relevant statements in this xact */ if (Trace_notify) elog(DEBUG1, "PreCommit_Notify"); /* Preflight for any pending listen/unlisten actions */ if (pendingActions != NULL) { foreach(p, pendingActions->actions) { ListenAction *actrec = (ListenAction *) lfirst(p); switch (actrec->action) { case LISTEN_LISTEN: Exec_ListenPreCommit(); break; case LISTEN_UNLISTEN: /* there is no Exec_UnlistenPreCommit() */ break; case LISTEN_UNLISTEN_ALL: /* there is no Exec_UnlistenAllPreCommit() */ break; } } } /* Queue any pending notifies (must happen after the above) */ if (pendingNotifies) { ListCell *nextNotify; /* * Make sure that we have an XID assigned to the current transaction. * GetCurrentTransactionId is cheap if we already have an XID, but not * so cheap if we don't, and we'd prefer not to do that work while * holding NotifyQueueLock. */ (void) GetCurrentTransactionId(); /* * Serialize writers by acquiring a special lock that we hold till * after commit. This ensures that queue entries appear in commit * order, and in particular that there are never uncommitted queue * entries ahead of committed ones, so an uncommitted transaction * can't block delivery of deliverable notifications. * * We use a heavyweight lock so that it'll automatically be released * after either commit or abort. This also allows deadlocks to be * detected, though really a deadlock shouldn't be possible here. * * The lock is on "database 0", which is pretty ugly but it doesn't * seem worth inventing a special locktag category just for this. * (Historical note: before PG 9.0, a similar lock on "database 0" was * used by the flatfiles mechanism.) */ LockSharedObject(DatabaseRelationId, InvalidOid, 0, AccessExclusiveLock); /* Now push the notifications into the queue */ backendHasSentNotifications = true; nextNotify = list_head(pendingNotifies->events); while (nextNotify != NULL) { /* * Add the pending notifications to the queue. We acquire and * release NotifyQueueLock once per page, which might be overkill * but it does allow readers to get in while we're doing this. * * A full queue is very uncommon and should really not happen, * given that we have so much space available in the SLRU pages. * Nevertheless we need to deal with this possibility. Note that * when we get here we are in the process of committing our * transaction, but we have not yet committed to clog, so at this * point in time we can still roll the transaction back. */ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); asyncQueueFillWarning(); if (asyncQueueIsFull()) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("too many notifications in the NOTIFY queue"))); nextNotify = asyncQueueAddEntries(nextNotify); LWLockRelease(NotifyQueueLock); } } }
调用RecordTransactionCommit()(在此之前,WAL记录已经刷新到pg_wal中)更新事务的提交状态到pg_xact后,会调用AtCommit_Notify发送通知。如下:
smgrDoPendingDeletes(true); AtCommit_Notify(); AtEOXact_GUC(true, 1); AtEOXact_SPI(true);
async.c中:
/* * AtCommit_Notify * * This is called at transaction commit, after committing to clog. * * Update listenChannels and clear transaction-local state. */ void AtCommit_Notify(void) { ListCell *p; /* * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to * return as soon as possible */ if (!pendingActions && !pendingNotifies) return; if (Trace_notify) elog(DEBUG1, "AtCommit_Notify"); /* Perform any pending listen/unlisten actions */ if (pendingActions != NULL) { foreach(p, pendingActions->actions) { ListenAction *actrec = (ListenAction *) lfirst(p); switch (actrec->action) { case LISTEN_LISTEN: Exec_ListenCommit(actrec->channel); break; case LISTEN_UNLISTEN: Exec_UnlistenCommit(actrec->channel); break; case LISTEN_UNLISTEN_ALL: Exec_UnlistenAllCommit(); break; } } } /* If no longer listening to anything, get out of listener array */ if (amRegisteredListener && listenChannels == NIL) asyncQueueUnregister(); /* And clean up */ ClearPendingActionsAndNotifies(); }
这篇关于postgresql/lightdb的notify机制--可靠缓存、MQ消息事务的救星的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-27[开源] 一款轻量级的kafka可视化管理平台
- 2024-10-23Kafka消息丢失资料详解:初学者必看教程
- 2024-10-23Kafka资料新手入门指南
- 2024-10-23Kafka解耦入门:新手必读教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka消息丢失入门:新手必读指南
- 2024-10-23Kafka消息队列入门:新手必看的简单教程
- 2024-10-23Kafka消息队列入门与应用
- 2024-10-23Kafka重复消费入门:轻松掌握Kafka重复消息处理技巧