Continuing from the previous post, this installment explores how to incorporate PostgreSQL notifications with backpressure to efficiently batch tokens.
Previously, we examined PostgreSQL triggers and their role in managing automated workflows. Although we briefly introduced NOTIFY statements, their full potential has yet to be explored. In this segment, we'll bring notification events into focus and develop a collector to process the tokens accumulating in the database.
Getting Started
Setting Up Your Environment
Clone the tetsuo/mailroom
repository:
git clone https://github.com/tetsuo/mailroom.git
Run the following command to create a new database in PostgreSQL:
createdb mailroom
Then, navigate to the migrations
folder and run:
psql -d mailroom < 0_init.up.sql
Alternatively, you can use go-migrate which is often my preference.
Inspect the Initial State of the Database
Before adding any mock data, let's take a look at the initial state of the jobs
table:
psql -d mailroom -c "SELECT * FROM jobs;"
You should see one row with job_type
set to mailroom
and last_seq
set to zero:
job_type | last_seq
----------+----------
mailroom | 0
(1 row)
Listening for Notifications
In a separate terminal, connect to the database again:
psql -d mailroom
Then enable listening on the token_insert
channel:
LISTEN token_insert;
Populate with Sample Data
Now, let's insert some dummy data to the accounts
table. This command will insert 3 records with randomized email and login fields:
printf "%.0sINSERT INTO accounts (email, login) VALUES ('user' || md5(random()::text) || '@fake.mail', 'user' || substr(md5(random()::text), 1, 20));\n" {1..3} | \
psql -d mailroom
Here's an example account
record:
-[ ACCOUNT 1 ]-------------------------------------------------------------------
id | 1
email | usere3213152e8cdf722466a011b1eaa3c98@fake.mail
status | provisioned
login | user85341405cb33cbe89a5f
created_at | 1735709763
status_changed_at |
activated_at |
suspended_at |
unsuspended_at |
The corresponding token
record generated by the trigger function:
-[ TOKEN 1 ]---------------------------------------------------------------------
id | 1
action | activation
secret | \x144d3ba23d4e60f80d3cb5cf25783539ba267af34aecd71d7cc888643c912fb7
code | 06435
account | 1
expires_at | 1735710663
consumed_at |
created_at | 1735709763
If the notifications haven't appeared yet, try issuing a semicolon (;
) in the psql
client to display them:
mailroom=# LISTEN token_insert;
LISTEN
mailroom=# ;
Asynchronous notification "token_insert" received from server process with PID 5148.
Asynchronous notification "token_insert" received from server process with PID 5148.
Asynchronous notification "token_insert" received from server process with PID 5148.
Dequeuing Pending Jobs
These notifications indicate that it's time to execute the query we defined in Part 1:
WITH token_data AS (
SELECT
t.account,
t.secret,
t.code,
t.expires_at,
t.id,
t.action,
a.email,
a.login
FROM
jobs
JOIN tokens t
ON t.id > jobs.last_seq
AND t.expires_at > EXTRACT(EPOCH FROM NOW())
AND t.consumed_at IS NULL
AND t.action IN ('activation', 'password_recovery')
JOIN accounts a
ON a.id = t.account
AND (
(t.action = 'activation' AND a.status = 'provisioned')
OR (t.action = 'password_recovery' AND a.status = 'active')
)
WHERE
jobs.job_type = 'mailroom'
ORDER BY id ASC
LIMIT 10
),
updated_jobs AS (
UPDATE
jobs
SET
last_seq = (SELECT MAX(id) FROM token_data)
WHERE
EXISTS (SELECT 1 FROM token_data)
RETURNING last_seq
)
SELECT
td.action,
td.email,
td.login,
td.secret,
td.code
FROM
token_data td
... which accomplishes two things:
- Retrieves tokens generated after the
last_seq
along with the corresponding user data. - Updates the
last_seq
value to avoid selecting duplicates later.
In other words, this query retrieves a batch of tokens and advances the cursor:
-[ RECORD 1 ]--------------------------------------------------------------
action | activation
email | usere3213152e8cdf722466a011b1eaa3c98@fake.mail
login | user85341405cb33cbe89a5f
secret | \x144d3ba23d4e60f80d3cb5cf25783539ba267af34aecd71d7cc888643c912fb7
code | 06435
-[ RECORD 2 ]--------------------------------------------------------------
action | activation
email | user41e8b6830c76870594161150051f8215@fake.mail
login | user2491d87beb8950b4abd7
secret | \x27100e07220b62e849e788e6554fede60c96e967c4aa62db7dc45150c51be23f
code | 80252
-[ RECORD 3 ]--------------------------------------------------------------
action | activation
email | user7bb11e235c85afe12076884d06910be4@fake.mail
login | user91ab8536cb05c37ff46a
secret | \xa9763eec727835bd97b79018b308613268d9ea0db70493fd212771c9b7c3bcb2
code | 31620
Overview
The collector subscribes to a PostgreSQL notification channel, tracks incoming events, and executes a query when either a row limit (based on the number of received notifications) or a timeout is reached.
Real-time Responsiveness
Instead of continuously polling the database for pending jobs, the collector tracks notifications. This enables it to determine exactly how many rows to dequeue and query the database only when there's actual work to do.
Responsiveness vs. Budget Constraints
However, the collector must also account for constraints imposed by our email provider—and, by extension, our budget. For instance, with Amazon SES charging $0.10 per 1,000 emails, a monthly budget of $100 translates to:
- 1,000,000 emails per month,
- 33,333 emails per day,
- 1,389 emails per hour,
- 23 emails per minute, and
- 0.38 emails per second.
At this rate, we would need to buffer for approximately 27 seconds for batches containing 10 destinations each:
10 emails / 0.38 emails per second ≈ 26.32 seconds
... which assumes we are operating at full capacity within our budget.
Batch Processing and Flow-Control
To accommodate these constraints, and prevent overwhelming downstream components (e.g., the email sender) we'll implement a form of backpressure by pulling tokens from the database in controlled batches, fetching more only when the collector is ready.
Here's how it works in practice:
Batch Limit
The maximum number of email destinations in a single batch.
The collector queries the database for at most N tokens at a time (where N is the batch limit). Even if 500 tokens are waiting in the database, the collector will only take, say, 10 at a time. This imposes a hard cap on the throughput of tokens that can leave the database and head downstream.
Batch Timeout
The time to wait for accumulating enough notifications to fill a batch.
Instead of hammering the database every millisecond or immediately after a few notifications arrive, the collector also waits up to X milliseconds before processing tokens (where X is the batch timeout). If fewer than the batch limit have arrived during that window, the collector will still dequeue whatever did arrive—but it won't keep pulling more immediately. In effect, this sets an upper limit on how long new tokens can linger around before being handed over to the email sender.
Example
If you set:
- A batch timeout of 30 seconds.
- A limit of 10 notifications.
This means:
- If 10 notifications arrive in quick succession, the batch is triggered immediately.
- If fewer than 10 arrive over 30 seconds, the batch is triggered when the timeout ends.
Implementation
The collector is written in C and interacts with PostgreSQL using libpq.
Connecting to PostgreSQL
The query from Part 1 lives in the db.c
file, along with other DB-related functions. When the collector first connects, it issues a LISTEN
command on the specified channel and creates the prepared statements for subsequent queries.
#include <libpq-fe.h>
// Establishes a connection to the database, listens for notifications, and
// creates prepared statements.
bool db_connect(PGconn **conn, const char *conninfo, const char *channel)
{
*conn = PQconnectdb(conninfo);
return PQstatus(*conn) == CONNECTION_OK &&
db_listen(*conn, channel) &&
db_prepare_statement(*conn, POSTGRES_HEALTHCHECK_PREPARED_STMT_NAME, "SELECT 1") &&
db_prepare_statement(*conn, POSTGRES_DATA_PREPARED_STMT_NAME, query);
}
Fetching & Serializing Email Payloads
When notifications arrive, the collector fetches tokens in batches and writes the results directly to stdout; processing continues until it has exhausted the queued tokens or an error occurs. The db_dequeue()
function handles this logic.
The results are output as line-delimited batches, formatted as comma-separated values in the following order:
action,email,username,secret,code
This schema is repeated for each row in the batch, all included in a single line.
action
: Numeric representation of the email action type (e.g.,1
for activation,2
for password recovery).email
: Recipient's email address.username
: Recipient's login name.secret
: A base64 URL-encoded string containing the signed token.code
: Optional numeric code (e.g., for password recovery).
Example
Here, the first line contains a batch of three actions, including both password recovery and account activation actions. The second line contains a single action for account activation.
2,john.doe123@fakemail.test,johndoe,0WEKrnjY_sTEqogrR6qsp7r7Vg4SQ_0iM_1La5hHp5p31nbkrHUBS0Cz9T24iBDCk6CFqO7tJTihpsOVuHYgLg,35866,1,jane.smith456@notreal.example,janesmith,BfQXx31qfY2IJFTtzAp21IdeW0dDIxUT1Ejf3tYJDukNsfaxxOfldwL-lEfVy4SEkZ_v18rf-EWsvWXH5qgvIg,24735,1,emma.jones789@madeup.mail,emmajones,jxrR5p72UWTQ8JiU2DrqjZ-K8L4t8i454S9NtPkVn4-1-bin3ediP0zHMDQU2J_iIyzH4XmNtzpXZhjV0n5xcA,25416
1,sarah.connor999@unreal.mail,resistance1234,zwhCIthd12DqpQSGB57S9Ky-OXV_8H0e8aHOv_kWoggIuAZ2sc-aQVpIoQ-M--PjwVfdIIxiXkv_WjRjGI57zA,38022
Signing & Verifying Tokens
During the dequeue operation, the token's secret is signed with HMAC-SHA256 and encoded in a URL-safe Base64 format.
The encoded output contains:
- A path name (e.g.,
/activate
or/recover
) - The original secret (and code, in the case of recovery)
- And the resulting cryptographic signature
static size_t construct_signature_data(char *output, const char *action,
const unsigned char *secret, const char *code)
{
size_t offset = 0;
if (strcmp(action, "activation") == 0)
{
memcpy(output, "/activate", 9); // "/activate" is 9 bytes
offset = 9;
memcpy(output + offset, secret, 32);
offset += 32;
}
else if (strcmp(action, "password_recovery") == 0)
{
memcpy(output, "/recover", 8); // "/recover" is 8 bytes
offset = 8;
memcpy(output + offset, secret, 32);
offset += 32;
memcpy(output + offset, code, 5); // code is 5 bytes
offset += 5;
}
return offset; // Total length of the constructed data
}
This step ensures authenticity checks can happen on the frontend without needing an immediate database call. If you'd like to see how the backend verifies these secrets, there is a verifyHmac.js
script in the repo for reference.
- Be sure to handle expired tokens. One approach is to include the
expires_at
value so you can check validity without a DB lookup. But if tokens remain valid for 15 minutes, a more thorough approach is to cache consumed tokens until they naturally expire, preventing reuse during their validity period. - Also, regularly rotate your signing key.
Putting It All Together
Environment Variables
In main.c
, you'll see references to environment variables like MAILROOM_BATCH_TIMEOUT
, MAILROOM_BATCH_LIMIT
, and MAILROOM_SECRET_KEY
(a 64-character hex string). Refer to the README
file for the full list.
Loop Overview
At a high level, the main loop repeatedly:
- Dequeues and processes ready batches
- Checks for new notifications
- Waits on
select()
for either database activity or a timeout - Performs periodic health checks
- Reconnects to the database if needed
When the batch limit is reached or the timeout occurs, the collector executes the dequeue query. If it detects a broken connection, it attempts to reconnect and resumes once stable.
Here's the pseudo-code representation:
// 🌟 Main processing loop
WHILE the application is running 🔄
// 🔌 Handle reconnection if needed
IF the connection is not ready ❌ THEN
reconnect to the database 🔄
initialize the connection ✅
reset counters 🔢
CONTINUE to the next iteration ⏩
END IF
// 📦 Process ready batches
IF ready for processing ✅ THEN
dequeue and process a batch of items 📤
reset state for the next cycle 🔁
CONTINUE to the next iteration ⏩
END IF
// 🛎️ Handle pending notifications
process all incoming notifications 📥
IF notifications exceed the batch limit 🚨 THEN
mark ready for processing ✅
CONTINUE to the next iteration ⏩
END IF
// ⏱️ Wait for new events or timeout
wait for activity on the connection 📡 or timeout ⌛
IF interrupted by a signal 🚨 THEN
handle the signal (e.g., shutdown) ❌
CONTINUE to the next iteration ⏩
ELSE IF timeout occurs ⏳ THEN
IF notifications exist 📋 THEN
mark ready for processing ✅
CONTINUE to the next iteration ⏩
END IF
perform periodic health checks 🩺
END IF
// 🛠️ Consume available data
consume data from the connection 📶
prepare for the next cycle 🔁
END WHILE
And here's the actual implementation:
int result;
PGconn *conn = NULL;
fd_set active_fds, read_fds;
int sock;
struct timeval tv;
int seen = 0;
PGnotify *notify = NULL;
int rc = 0;
long start = get_current_time_ms();
long now, elapsed, remaining_ms;
long last_healthcheck = start;
int ready = -1;
while (running)
{
if (ready < 0)
{
if (conn)
{
PQfinish(conn);
}
if (!db_connect(&conn, conninfo, channel_name))
{
log_printf("ERROR: connection failed: %s", PQerrorMessage(conn));
return exit_code(conn, EXIT_FAILURE);
}
log_printf("connected");
while (running && (result = db_dequeue(conn, queue_name, batch_limit, batch_limit)) == batch_limit)
;
if (result < 0)
{
return exit_code(conn, EXIT_FAILURE);
}
FD_ZERO(&active_fds);
sock = PQsocket(conn);
FD_SET(sock, &active_fds);
seen = 0;
ready = 0;
last_healthcheck = get_current_time_ms();
continue;
}
else if (ready > 0)
{
result = db_dequeue(conn, queue_name, seen, batch_limit);
if (result == -2)
{
return exit_code(conn, EXIT_FAILURE);
}
else if (result == -1)
{
log_printf("WARN: forcing reconnect...");
ready = -1;
continue;
}
else if (result != seen)
{
log_printf("WARN: expected %d items to be processed, got %d", seen, result);
}
seen = 0;
ready = 0;
last_healthcheck = get_current_time_ms();
}
// Process any pending notifications before select()
while (running && (notify = PQnotifies(conn)) != NULL)
{
PQfreemem(notify);
if (seen == 0)
{
log_printf("NOTIFY called; waking up");
start = get_current_time_ms(); // Received first notification; reset timer
}
seen++;
PQconsumeInput(conn);
}
if (seen >= batch_limit)
{
log_printf("processing %d rows... (max reached)", seen);
ready = 1;
continue; // Skip select() and process immediately
}
now = get_current_time_ms();
elapsed = now - start;
remaining_ms = timeout_ms - elapsed;
if (remaining_ms < 0)
{
remaining_ms = 0;
}
tv.tv_sec = remaining_ms / 1000;
tv.tv_usec = (remaining_ms % 1000) * 1000;
read_fds = active_fds;
rc = select(sock + 1, &read_fds, NULL, NULL, &tv);
if (rc < 0)
{
if (errno == EINTR)
{
if (!running)
{
break;
}
log_printf("WARN: select interrupted by signal");
continue;
}
log_printf("ERROR: select failed: %s (socket=%d)", strerror(errno), sock);
break;
}
else if (rc == 0)
{ // Timeout occurred;
start = get_current_time_ms(); // Reset the timer
if (seen > 0)
{
log_printf("processing %d rows... (timeout)", seen);
ready = 1;
continue;
}
if ((sock = PQsocket(conn)) < 0)
{
log_printf("WARN: socket closed; %s", PQerrorMessage(conn));
ready = -1;
continue;
}
if (now - last_healthcheck >= healthcheck_ms)
{
if (!db_healthcheck(conn))
{
ready = -1;
continue;
}
else
{
last_healthcheck = start;
}
}
}
if (!FD_ISSET(sock, &read_fds))
{
continue;
}
do
{
if (!PQconsumeInput(conn))
{
log_printf("WARN: error consuming input: %s", PQerrorMessage(conn));
if (PQstatus(conn) != CONNECTION_OK)
{
ready = -1;
break;
}
}
} while (running && PQisBusy(conn));
}
select()
in a Nutshell
The select()
system call is central to how the program operates. It's a UNIX mechanism that monitors file descriptors (like sockets) to determine if they're ready for I/O operations (e.g., reading or writing).
In this code, select()
is used to:
- Monitor the socket for new notifications
- Enforce a timeout for batch processing
PQconsumeInput
and PQnotifies
After select()
signals that data is ready, the collector calls PQconsumeInput
, which reads data into libpq's internal buffers. It then calls PQnotifies
to retrieve any pending notifications and increment the counter.
Read more about libpq's async API.
Compile & Run
To compile, verify that you have openssl@3
and libpq@5
installed, then use the provided Makefile
.
Run the Collector
Use the following command to build and run the collector
executable with example configuration variables:
make && \
MAILROOM_BATCH_LIMIT=3 \
MAILROOM_BATCH_TIMEOUT=5000 \
MAILROOM_DATABASE_URL="dbname=mailroom" \
MAILROOM_SECRET_KEY="cafebabecafebabecafebabecafebabecafebabecafebabecafebabecafebabe" \
./collector
Once configured and started, it will log its activity:
2024/04/20 13:37:00 [PG] configured; channel=token_insert queue=mailroom limit=3 timeout=5000ms healthcheck-interval=270000ms
2024/04/20 13:37:00 [PG] connecting to host=/tmp port=5432 dbname=mailroom user=ogu sslmode=disable
2024/04/20 13:37:00 [PG] connected
Insert Accounts and Observe Batching
In another terminal, insert 5 accounts:
printf "%.0sINSERT INTO accounts (email, login) VALUES ('user' || md5(random()::text) || '@fake.mail', 'user' || substr(md5(random()::text), 1, 20));\n" {1..5} | \
psql -d mailroom
You'll observe the collector
immediately process the first batch of three items. After a 5-second delay (as defined by the MAILROOM_BATCH_TIMEOUT
), it processes the remaining two in a second batch:
2024/04/20 13:37:00 [PG] NOTIFY called; waking up
2024/04/20 13:37:00 [PG] processing 3 rows... (max reached)
1,userb183abb7a25d04027061e6b8d8d8e7fa@fake.mail,userb0bf075b82b892f53d97,gVRNesi-opSvs3ntPfr9DzSn_JwbOD04VVIurQSCOFzzd3BOM3WBDL3SOtDjMxKLd6csSn8_p9hemXHIUxIjPg,78092,1,user43b01ba9686c886473e526429dd2c672@fake.mail,userf420078dba4fd5a91de2,--DTy5LsbDeLP_AweXIPSjL3_avQMT5cH_bRxPy1uxQLVhXKaw7Oxd7NYkcJ6MZmnnqWqTcBPHA5z7bqunXEAA,25778,1,user46f81dfd34b91a1904ac4524193575aa@fake.mail,user6d91baab56d2823b326d,ryooWewe3OTxIGF1Gjl5Vvl8BsXoqWVbCAt1t6J--_KX1SM4DbyCes4yn75OWVe60G4MMZdv4byRh1wy-Clvxw,78202
2024/04/20 13:37:00 [PG] NOTIFY called; waking up
2024/04/20 13:37:05 [PG] processing 2 rows... (timeout)
1,user12d2722e1c07b0a531ea69ae125d4697@fake.mail,user853ae29eefc5d44a6bc6,4pmew2o2EOAZBDHWvJBcixJftpRCb8uyXZhzN12EOcrLBmzc4ic9avwd9dla09pIiKIoqW5iIwMfoXLEM3_LGw,38806,1,user9497d0e033019fcf3198eecb053ba40e@fake.mail,userfcde338dba96cc419613,ANLMa-1y37VLCDqK0wnfEFhUVzHsWpaNGV2ttI8m3o6_lbbYOKmp3hP7Q8H8ZQRNMPAj4xsSqC26nesfVZLgzQ,89897
Testing Reconnect Behavior
To simulate a dropped connection, open another terminal, connect to mailroom
via psql
, and run:
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE
datname = 'mailroom'
AND pid <> pg_backend_pid();
After the connection is killed, select()
wakes up, causing PQconsumeInput()
to fail with an error. The collector logs a reconnect attempt, and once reconnected, it resumes processing without losing track of queued tokens during the downtime.
2024/04/20 13:37:42 [PG] WARN: error consuming input: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
2024/04/20 13:37:42 [PG] connecting to host=/tmp port=5432 dbname=mailroom user=ogu sslmode=disable
2024/04/20 13:37:42 [PG] connected
Further Improvements
The mailroom system we are building is relatively basic. Advanced streaming solutions often employ more sophisticated batching strategies.
Priority Queues
Our current PostgreSQL-based queueing mechanism processes tokens without distinguishing between their types and lacks the ability to prioritize critical ones, such as password recovery, over less urgent emails like account activations. At present, '10 emails per second' could mean 10 emails of the same type or a mix, depending on the batch. While effective, this design leaves room for improvement, such as introducing prioritization or smarter batching strategies.
Adaptive Batching
User activity is rarely consistent. There are likely to be bursts of high activity, far exceeding our daily/hourly quotas, interspersed with periods of minimal activity.
Rather than using fixed limits and timeouts, we can adjust batch size and timeout values dynamically in response to real-time conditions. During low-traffic periods, the batch size can be increased to improve efficiency. During peak hours, it can be reduced to minimize delays. We could even incorporate machine learning to automate these adjustments.
Next
In the upcoming Part 3, we'll build the sender component to handle email delivery with rate-limiting.