January 10, 2025 · c, database, io, mailroom and postgres

Continuing from the previous post, this installment explores how to incorporate PostgreSQL notifications with backpressure to efficiently batch tokens.

Previously, we examined PostgreSQL triggers and their role in managing automated workflows. Although we briefly introduced NOTIFY statements, their full potential has yet to be explored. In this segment, we'll bring notification events into focus and develop a collector to process the tokens accumulating in the database.

Getting Started

Setting Up Your Environment

Clone the tetsuo/mailroom repository:

git clone https://github.com/tetsuo/mailroom.git

Run the following command to create a new database in PostgreSQL:

createdb mailroom

Then, navigate to the migrations folder and run:

psql -d mailroom < 0_init.up.sql

Alternatively, you can use go-migrate which is often my preference.

Inspect the Initial State of the Database

Before adding any mock data, let's take a look at the initial state of the jobs table:

psql -d mailroom -c "SELECT * FROM jobs;"

You should see one row with job_type set to mailroom and last_seq set to zero:

 job_type | last_seq
----------+----------
 mailroom |        0
(1 row)

Listening for Notifications

In a separate terminal, connect to the database again:

psql -d mailroom

Then enable listening on the token_insert channel:

LISTEN token_insert;

Populate with Sample Data

Now, let's insert some dummy data to the accounts table. This command will insert 3 records with randomized email and login fields:

printf "%.0sINSERT INTO accounts (email, login) VALUES ('user' || md5(random()::text) || '@fake.mail', 'user' || substr(md5(random()::text), 1, 20));\n" {1..3} | \
    psql -d mailroom

Here's an example account record:

-[ ACCOUNT 1 ]-------------------------------------------------------------------
id                | 1
email             | usere3213152e8cdf722466a011b1eaa3c98@fake.mail
status            | provisioned
login             | user85341405cb33cbe89a5f
created_at        | 1735709763
status_changed_at |
activated_at      |
suspended_at      |
unsuspended_at    |

The corresponding token record generated by the trigger function:

-[ TOKEN 1 ]---------------------------------------------------------------------
id          | 1
action      | activation
secret      | \x144d3ba23d4e60f80d3cb5cf25783539ba267af34aecd71d7cc888643c912fb7
code        | 06435
account     | 1
expires_at  | 1735710663
consumed_at |
created_at  | 1735709763

If the notifications haven't appeared yet, try issuing a semicolon (;) in the psql client to display them:

mailroom=# LISTEN token_insert;
LISTEN
mailroom=# ;
Asynchronous notification "token_insert" received from server process with PID 5148.
Asynchronous notification "token_insert" received from server process with PID 5148.
Asynchronous notification "token_insert" received from server process with PID 5148.

Dequeuing Pending Jobs

These notifications indicate that it's time to execute the query we defined in Part 1:

WITH token_data AS (
  SELECT
    t.account,
    t.secret,
    t.code,
    t.expires_at,
    t.id,
    t.action,
    a.email,
    a.login
  FROM
    jobs
  JOIN tokens t
    ON t.id > jobs.last_seq
    AND t.expires_at > EXTRACT(EPOCH FROM NOW())
    AND t.consumed_at IS NULL
    AND t.action IN ('activation', 'password_recovery')
  JOIN accounts a
    ON a.id = t.account
    AND (
      (t.action = 'activation' AND a.status = 'provisioned')
      OR (t.action = 'password_recovery' AND a.status = 'active')
    )
  WHERE
    jobs.job_type = 'mailroom'
  ORDER BY id ASC
  LIMIT 10
),
updated_jobs AS (
  UPDATE
    jobs
  SET
    last_seq = (SELECT MAX(id) FROM token_data)
  WHERE
    EXISTS (SELECT 1 FROM token_data)
  RETURNING last_seq
)
SELECT
  td.action,
  td.email,
  td.login,
  td.secret,
  td.code
FROM
  token_data td

... which accomplishes two things:

  1. Retrieves tokens generated after the last_seq along with the corresponding user data.
  2. Updates the last_seq value to avoid selecting duplicates later.

In other words, this query retrieves a batch of tokens and advances the cursor:

-[ RECORD 1 ]--------------------------------------------------------------
action | activation
email  | usere3213152e8cdf722466a011b1eaa3c98@fake.mail
login  | user85341405cb33cbe89a5f
secret | \x144d3ba23d4e60f80d3cb5cf25783539ba267af34aecd71d7cc888643c912fb7
code   | 06435
-[ RECORD 2 ]--------------------------------------------------------------
action | activation
email  | user41e8b6830c76870594161150051f8215@fake.mail
login  | user2491d87beb8950b4abd7
secret | \x27100e07220b62e849e788e6554fede60c96e967c4aa62db7dc45150c51be23f
code   | 80252
-[ RECORD 3 ]--------------------------------------------------------------
action | activation
email  | user7bb11e235c85afe12076884d06910be4@fake.mail
login  | user91ab8536cb05c37ff46a
secret | \xa9763eec727835bd97b79018b308613268d9ea0db70493fd212771c9b7c3bcb2
code   | 31620

Overview

The collector subscribes to a PostgreSQL notification channel, tracks incoming events, and executes a query when either a row limit (based on the number of received notifications) or a timeout is reached.

Real-time Responsiveness

Instead of continuously polling the database for pending jobs, the collector tracks notifications. This enables it to determine exactly how many rows to dequeue and query the database only when there's actual work to do.

Responsiveness vs. Budget Constraints

However, the collector must also account for constraints imposed by our email provider—and, by extension, our budget. For instance, with Amazon SES charging $0.10 per 1,000 emails, a monthly budget of $100 translates to:

  • 1,000,000 emails per month,
  • 33,333 emails per day,
  • 1,389 emails per hour,
  • 23 emails per minute, and
  • 0.38 emails per second.

At this rate, we would need to buffer for approximately 27 seconds for batches containing 10 destinations each:

10 emails / 0.38 emails per second ≈ 26.32 seconds

... which assumes we are operating at full capacity within our budget.

Batch Processing and Flow-Control

To accommodate these constraints, and prevent overwhelming downstream components (e.g., the email sender) we'll implement a form of backpressure by pulling tokens from the database in controlled batches, fetching more only when the collector is ready.

Here's how it works in practice:

Batch Limit

The maximum number of email destinations in a single batch.

The collector queries the database for at most N tokens at a time (where N is the batch limit). Even if 500 tokens are waiting in the database, the collector will only take, say, 10 at a time. This imposes a hard cap on the throughput of tokens that can leave the database and head downstream.

Batch Timeout

The time to wait for accumulating enough notifications to fill a batch.

Instead of hammering the database every millisecond or immediately after a few notifications arrive, the collector also waits up to X milliseconds before processing tokens (where X is the batch timeout). If fewer than the batch limit have arrived during that window, the collector will still dequeue whatever did arrive—but it won't keep pulling more immediately. In effect, this sets an upper limit on how long new tokens can linger around before being handed over to the email sender.

Example

If you set:

  • A batch timeout of 30 seconds.
  • A limit of 10 notifications.

This means:

  • If 10 notifications arrive in quick succession, the batch is triggered immediately.
  • If fewer than 10 arrive over 30 seconds, the batch is triggered when the timeout ends.

Implementation

The collector is written in C and interacts with PostgreSQL using libpq.

Connecting to PostgreSQL

The query from Part 1 lives in the db.c file, along with other DB-related functions. When the collector first connects, it issues a LISTEN command on the specified channel and creates the prepared statements for subsequent queries.

db.c

#include <libpq-fe.h>

// Establishes a connection to the database, listens for notifications, and
// creates prepared statements.
bool db_connect(PGconn **conn, const char *conninfo, const char *channel)
{
  *conn = PQconnectdb(conninfo);

  return PQstatus(*conn) == CONNECTION_OK &&
         db_listen(*conn, channel) &&
         db_prepare_statement(*conn, POSTGRES_HEALTHCHECK_PREPARED_STMT_NAME, "SELECT 1") &&
         db_prepare_statement(*conn, POSTGRES_DATA_PREPARED_STMT_NAME, query);
}

Fetching & Serializing Email Payloads

When notifications arrive, the collector fetches tokens in batches and writes the results directly to stdout; processing continues until it has exhausted the queued tokens or an error occurs. The db_dequeue() function handles this logic.

The results are output as line-delimited batches, formatted as comma-separated values in the following order:

action,email,username,secret,code

This schema is repeated for each row in the batch, all included in a single line.

  • action: Numeric representation of the email action type (e.g., 1 for activation, 2 for password recovery).
  • email: Recipient's email address.
  • username: Recipient's login name.
  • secret: A base64 URL-encoded string containing the signed token.
  • code: Optional numeric code (e.g., for password recovery).

Example

Here, the first line contains a batch of three actions, including both password recovery and account activation actions. The second line contains a single action for account activation.

2,john.doe123@fakemail.test,johndoe,0WEKrnjY_sTEqogrR6qsp7r7Vg4SQ_0iM_1La5hHp5p31nbkrHUBS0Cz9T24iBDCk6CFqO7tJTihpsOVuHYgLg,35866,1,jane.smith456@notreal.example,janesmith,BfQXx31qfY2IJFTtzAp21IdeW0dDIxUT1Ejf3tYJDukNsfaxxOfldwL-lEfVy4SEkZ_v18rf-EWsvWXH5qgvIg,24735,1,emma.jones789@madeup.mail,emmajones,jxrR5p72UWTQ8JiU2DrqjZ-K8L4t8i454S9NtPkVn4-1-bin3ediP0zHMDQU2J_iIyzH4XmNtzpXZhjV0n5xcA,25416
1,sarah.connor999@unreal.mail,resistance1234,zwhCIthd12DqpQSGB57S9Ky-OXV_8H0e8aHOv_kWoggIuAZ2sc-aQVpIoQ-M--PjwVfdIIxiXkv_WjRjGI57zA,38022

Signing & Verifying Tokens

During the dequeue operation, the token's secret is signed with HMAC-SHA256 and encoded in a URL-safe Base64 format.

The encoded output contains:

  • A path name (e.g., /activate or /recover)
  • The original secret (and code, in the case of recovery)
  • And the resulting cryptographic signature

db.c

static size_t construct_signature_data(char *output, const char *action,
                                       const unsigned char *secret, const char *code)
{
  size_t offset = 0;

  if (strcmp(action, "activation") == 0)
  {
    memcpy(output, "/activate", 9); // "/activate" is 9 bytes
    offset = 9;
    memcpy(output + offset, secret, 32);
    offset += 32;
  }
  else if (strcmp(action, "password_recovery") == 0)
  {
    memcpy(output, "/recover", 8); // "/recover" is 8 bytes
    offset = 8;
    memcpy(output + offset, secret, 32);
    offset += 32;
    memcpy(output + offset, code, 5); // code is 5 bytes
    offset += 5;
  }

  return offset; // Total length of the constructed data
}

This step ensures authenticity checks can happen on the frontend without needing an immediate database call. If you'd like to see how the backend verifies these secrets, there is a verifyHmac.js script in the repo for reference.

  • Be sure to handle expired tokens. One approach is to include the expires_at value so you can check validity without a DB lookup. But if tokens remain valid for 15 minutes, a more thorough approach is to cache consumed tokens until they naturally expire, preventing reuse during their validity period.
  • Also, regularly rotate your signing key.

Putting It All Together

Environment Variables

In main.c, you'll see references to environment variables like MAILROOM_BATCH_TIMEOUT, MAILROOM_BATCH_LIMIT, and MAILROOM_SECRET_KEY (a 64-character hex string). Refer to the README file for the full list.

Loop Overview

At a high level, the main loop repeatedly:

  • Dequeues and processes ready batches
  • Checks for new notifications
  • Waits on select() for either database activity or a timeout
  • Performs periodic health checks
  • Reconnects to the database if needed

When the batch limit is reached or the timeout occurs, the collector executes the dequeue query. If it detects a broken connection, it attempts to reconnect and resumes once stable.

Here's the pseudo-code representation:

// 🌟 Main processing loop
WHILE the application is running 🔄
    // 🔌 Handle reconnection if needed
    IF the connection is not ready ❌ THEN
        reconnect to the database 🔄
        initialize the connection ✅
        reset counters 🔢
        CONTINUE to the next iteration ⏩
    END IF

    // 📦 Process ready batches
    IF ready for processing ✅ THEN
        dequeue and process a batch of items 📤
        reset state for the next cycle 🔁
        CONTINUE to the next iteration ⏩
    END IF

    // 🛎️ Handle pending notifications
    process all incoming notifications 📥
    IF notifications exceed the batch limit 🚨 THEN
        mark ready for processing ✅
        CONTINUE to the next iteration ⏩
    END IF

    // ⏱️ Wait for new events or timeout
    wait for activity on the connection 📡 or timeout ⌛
    IF interrupted by a signal 🚨 THEN
        handle the signal (e.g., shutdown) ❌
        CONTINUE to the next iteration ⏩
    ELSE IF timeout occurs ⏳ THEN
        IF notifications exist 📋 THEN
            mark ready for processing ✅
            CONTINUE to the next iteration ⏩
        END IF
        perform periodic health checks 🩺
    END IF

    // 🛠️ Consume available data
    consume data from the connection 📶
    prepare for the next cycle 🔁
END WHILE

And here's the actual implementation:

main.c

int result;

PGconn *conn = NULL;

fd_set active_fds, read_fds;
int sock;

struct timeval tv;
int seen = 0;

PGnotify *notify = NULL;
int rc = 0;

long start = get_current_time_ms();
long now, elapsed, remaining_ms;

long last_healthcheck = start;

int ready = -1;

while (running)
{
  if (ready < 0)
  {
    if (conn)
    {
      PQfinish(conn);
    }

    if (!db_connect(&conn, conninfo, channel_name))
    {
      log_printf("ERROR: connection failed: %s", PQerrorMessage(conn));
      return exit_code(conn, EXIT_FAILURE);
    }

    log_printf("connected");

    while (running && (result = db_dequeue(conn, queue_name, batch_limit, batch_limit)) == batch_limit)
      ;

    if (result < 0)
    {
      return exit_code(conn, EXIT_FAILURE);
    }

    FD_ZERO(&active_fds);
    sock = PQsocket(conn);
    FD_SET(sock, &active_fds);

    seen = 0;
    ready = 0;
    last_healthcheck = get_current_time_ms();

    continue;
  }
  else if (ready > 0)
  {
    result = db_dequeue(conn, queue_name, seen, batch_limit);
    if (result == -2)
    {
      return exit_code(conn, EXIT_FAILURE);
    }
    else if (result == -1)
    {
      log_printf("WARN: forcing reconnect...");
      ready = -1;
      continue;
    }
    else if (result != seen)
    {
      log_printf("WARN: expected %d items to be processed, got %d", seen, result);
    }

    seen = 0;
    ready = 0;
    last_healthcheck = get_current_time_ms();
  }

  // Process any pending notifications before select()
  while (running && (notify = PQnotifies(conn)) != NULL)
  {
    PQfreemem(notify);
    if (seen == 0)
    {
      log_printf("NOTIFY called; waking up");
      start = get_current_time_ms(); // Received first notification; reset timer
    }
    seen++;
    PQconsumeInput(conn);
  }

  if (seen >= batch_limit)
  {
    log_printf("processing %d rows... (max reached)", seen);

    ready = 1;
    continue; // Skip select() and process immediately
  }

  now = get_current_time_ms();
  elapsed = now - start;
  remaining_ms = timeout_ms - elapsed;

  if (remaining_ms < 0)
  {
    remaining_ms = 0;
  }

  tv.tv_sec = remaining_ms / 1000;
  tv.tv_usec = (remaining_ms % 1000) * 1000;

  read_fds = active_fds;

  rc = select(sock + 1, &read_fds, NULL, NULL, &tv);

  if (rc < 0)
  {
    if (errno == EINTR)
    {
      if (!running)
      {
        break;
      }
      log_printf("WARN: select interrupted by signal");
      continue;
    }
    log_printf("ERROR: select failed: %s (socket=%d)", strerror(errno), sock);
    break;
  }
  else if (rc == 0)
  {                                // Timeout occurred;
    start = get_current_time_ms(); // Reset the timer

    if (seen > 0)
    {
      log_printf("processing %d rows... (timeout)", seen);

      ready = 1;
      continue;
    }

    if ((sock = PQsocket(conn)) < 0)
    {
      log_printf("WARN: socket closed; %s", PQerrorMessage(conn));
      ready = -1;
      continue;
    }

    if (now - last_healthcheck >= healthcheck_ms)
    {
      if (!db_healthcheck(conn))
      {
        ready = -1;
        continue;
      }
      else
      {
        last_healthcheck = start;
      }
    }
  }

  if (!FD_ISSET(sock, &read_fds))
  {
    continue;
  }

  do
  {
    if (!PQconsumeInput(conn))
    {
      log_printf("WARN: error consuming input: %s", PQerrorMessage(conn));
      if (PQstatus(conn) != CONNECTION_OK)
      {
        ready = -1;
        break;
      }
    }
  } while (running && PQisBusy(conn));
}

select() in a Nutshell

The select() system call is central to how the program operates. It's a UNIX mechanism that monitors file descriptors (like sockets) to determine if they're ready for I/O operations (e.g., reading or writing).

In this code, select() is used to:

  • Monitor the socket for new notifications
  • Enforce a timeout for batch processing

PQconsumeInput and PQnotifies

After select() signals that data is ready, the collector calls PQconsumeInput, which reads data into libpq's internal buffers. It then calls PQnotifies to retrieve any pending notifications and increment the counter.

Read more about libpq's async API.

Compile & Run

To compile, verify that you have openssl@3 and libpq@5 installed, then use the provided Makefile.

Run the Collector

Use the following command to build and run the collector executable with example configuration variables:

make && \
  MAILROOM_BATCH_LIMIT=3 \
  MAILROOM_BATCH_TIMEOUT=5000 \
  MAILROOM_DATABASE_URL="dbname=mailroom" \
  MAILROOM_SECRET_KEY="cafebabecafebabecafebabecafebabecafebabecafebabecafebabecafebabe" \
  ./collector

Once configured and started, it will log its activity:

2024/04/20 13:37:00 [PG] configured; channel=token_insert queue=mailroom limit=3 timeout=5000ms healthcheck-interval=270000ms
2024/04/20 13:37:00 [PG] connecting to host=/tmp port=5432 dbname=mailroom user=ogu sslmode=disable
2024/04/20 13:37:00 [PG] connected

Insert Accounts and Observe Batching

In another terminal, insert 5 accounts:

printf "%.0sINSERT INTO accounts (email, login) VALUES ('user' || md5(random()::text) || '@fake.mail', 'user' || substr(md5(random()::text), 1, 20));\n" {1..5} | \
    psql -d mailroom

You'll observe the collector immediately process the first batch of three items. After a 5-second delay (as defined by the MAILROOM_BATCH_TIMEOUT), it processes the remaining two in a second batch:

2024/04/20 13:37:00 [PG] NOTIFY called; waking up
2024/04/20 13:37:00 [PG] processing 3 rows... (max reached)
1,userb183abb7a25d04027061e6b8d8d8e7fa@fake.mail,userb0bf075b82b892f53d97,gVRNesi-opSvs3ntPfr9DzSn_JwbOD04VVIurQSCOFzzd3BOM3WBDL3SOtDjMxKLd6csSn8_p9hemXHIUxIjPg,78092,1,user43b01ba9686c886473e526429dd2c672@fake.mail,userf420078dba4fd5a91de2,--DTy5LsbDeLP_AweXIPSjL3_avQMT5cH_bRxPy1uxQLVhXKaw7Oxd7NYkcJ6MZmnnqWqTcBPHA5z7bqunXEAA,25778,1,user46f81dfd34b91a1904ac4524193575aa@fake.mail,user6d91baab56d2823b326d,ryooWewe3OTxIGF1Gjl5Vvl8BsXoqWVbCAt1t6J--_KX1SM4DbyCes4yn75OWVe60G4MMZdv4byRh1wy-Clvxw,78202
2024/04/20 13:37:00 [PG] NOTIFY called; waking up
2024/04/20 13:37:05 [PG] processing 2 rows... (timeout)
1,user12d2722e1c07b0a531ea69ae125d4697@fake.mail,user853ae29eefc5d44a6bc6,4pmew2o2EOAZBDHWvJBcixJftpRCb8uyXZhzN12EOcrLBmzc4ic9avwd9dla09pIiKIoqW5iIwMfoXLEM3_LGw,38806,1,user9497d0e033019fcf3198eecb053ba40e@fake.mail,userfcde338dba96cc419613,ANLMa-1y37VLCDqK0wnfEFhUVzHsWpaNGV2ttI8m3o6_lbbYOKmp3hP7Q8H8ZQRNMPAj4xsSqC26nesfVZLgzQ,89897

Testing Reconnect Behavior

To simulate a dropped connection, open another terminal, connect to mailroom via psql, and run:

SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE
  datname = 'mailroom'
  AND pid <> pg_backend_pid();

After the connection is killed, select() wakes up, causing PQconsumeInput() to fail with an error. The collector logs a reconnect attempt, and once reconnected, it resumes processing without losing track of queued tokens during the downtime.

2024/04/20 13:37:42 [PG] WARN: error consuming input: server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.

2024/04/20 13:37:42 [PG] connecting to host=/tmp port=5432 dbname=mailroom user=ogu sslmode=disable
2024/04/20 13:37:42 [PG] connected

Further Improvements

The mailroom system we are building is relatively basic. Advanced streaming solutions often employ more sophisticated batching strategies.

Priority Queues

Our current PostgreSQL-based queueing mechanism processes tokens without distinguishing between their types and lacks the ability to prioritize critical ones, such as password recovery, over less urgent emails like account activations. At present, '10 emails per second' could mean 10 emails of the same type or a mix, depending on the batch. While effective, this design leaves room for improvement, such as introducing prioritization or smarter batching strategies.

Adaptive Batching

User activity is rarely consistent. There are likely to be bursts of high activity, far exceeding our daily/hourly quotas, interspersed with periods of minimal activity.

Rather than using fixed limits and timeouts, we can adjust batch size and timeout values dynamically in response to real-time conditions. During low-traffic periods, the batch size can be increased to improve efficiency. During peak hours, it can be reduced to minimize delays. We could even incorporate machine learning to automate these adjustments.

Next

In the upcoming Part 3, we'll build the sender component to handle email delivery with rate-limiting.