Harnessing PostgreSQL notifications and libpq async API to drive a real-time token processing pipeline with backpressure.
Previously, we explored triggers in PostgreSQL and their role in managing a token queue. Although we lightly incorporated NOTIFY statements, we haven't fully examined their power yet. In this segment, we'll bring notification events into focus and build a collector to handle them.
Setting Up Your Environment
Clone the tetsuo/mailroom
repository:
git clone https://github.com/tetsuo/mailroom.git
Run the following command to create a new database in PostgreSQL:
createdb mailroom
Then, navigate to the migrations
folder and run:
psql -d mailroom < 0_init.up.sql
Alternatively, you can use go-migrate which is often my preference.
Inspect the Initial State
Before adding any mock data, let's take a look at the initial state of the jobs
table:
psql -d mailroom -c "SELECT * FROM jobs;"
You should see one row with job_type
set to mailroom
and last_seq
set to zero:
job_type | last_seq
----------+----------
mailroom | 0
(1 row)
Start Listening for Updates
In a separate terminal, connect to the database again:
psql -d mailroom
Then enable listening on the token_insert
channel:
LISTEN token_insert;
Populate with Sample Data
Now, let's insert some dummy data to the accounts
table. This command will insert 3 records with randomized email and login fields:
printf "%.0sINSERT INTO accounts (email, login) VALUES ('user' || md5(random()::text) || '@fake.mail', 'user' || substr(md5(random()::text), 1, 20));\n" {1..3} | \
psql -d mailroom
Here's an example account
record:
-[ ACCOUNT 1 ]-------------------------------------------------------------------
id | 1
email | usere3213152e8cdf722466a011b1eaa3c98@fake.mail
status | provisioned
login | user85341405cb33cbe89a5f
created_at | 1735709763
status_changed_at |
activated_at |
suspended_at |
unsuspended_at |
The corresponding token
record generated by the trigger function:
-[ TOKEN 1 ]---------------------------------------------------------------------
id | 1
action | activation
secret | \x144d3ba23d4e60f80d3cb5cf25783539ba267af34aecd71d7cc888643c912fb7
code | 06435
account | 1
expires_at | 1735710663
consumed_at |
created_at | 1735709763
If the notifications haven't appeared yet, try issuing a semicolon (;
) in the psql
client to display them:
mailroom=# LISTEN token_insert;
LISTEN
mailroom=# ;
Asynchronous notification "token_insert" received from server process with PID 5148.
Asynchronous notification "token_insert" received from server process with PID 5148.
Asynchronous notification "token_insert" received from server process with PID 5148.
Dequeuing Pending Jobs
These notifications indicate that it's time to execute the query we defined in Part 1:
WITH token_data AS (
SELECT
t.account,
t.secret,
t.code,
t.expires_at,
t.id,
t.action,
a.email,
a.login
FROM
jobs
JOIN tokens t
ON t.id > jobs.last_seq
AND t.expires_at > EXTRACT(EPOCH FROM NOW())
AND t.consumed_at IS NULL
AND t.action IN ('activation', 'password_recovery')
JOIN accounts a
ON a.id = t.account
AND (
(t.action = 'activation' AND a.status = 'provisioned')
OR (t.action = 'password_recovery' AND a.status = 'active')
)
WHERE
jobs.job_type = 'mailroom'
ORDER BY id ASC
LIMIT 10
),
updated_jobs AS (
UPDATE
jobs
SET
last_seq = (SELECT MAX(id) FROM token_data)
WHERE
EXISTS (SELECT 1 FROM token_data)
RETURNING last_seq
)
SELECT
td.action,
td.email,
td.login,
td.secret,
td.code
FROM
token_data td
... which accomplishes two things:
- Retrieves tokens generated after the
last_seq
along with the corresponding user data - Updates the
last_seq
value to avoid selecting duplicates later
In other words, this query retrieves a batch of tokens and advances the cursor:
-[ RECORD 1 ]--------------------------------------------------------------
action | activation
email | usere3213152e8cdf722466a011b1eaa3c98@fake.mail
login | user85341405cb33cbe89a5f
secret | \x144d3ba23d4e60f80d3cb5cf25783539ba267af34aecd71d7cc888643c912fb7
code | 06435
-[ RECORD 2 ]--------------------------------------------------------------
action | activation
email | user41e8b6830c76870594161150051f8215@fake.mail
login | user2491d87beb8950b4abd7
secret | \x27100e07220b62e849e788e6554fede60c96e967c4aa62db7dc45150c51be23f
code | 80252
-[ RECORD 3 ]--------------------------------------------------------------
action | activation
email | user7bb11e235c85afe12076884d06910be4@fake.mail
login | user91ab8536cb05c37ff46a
secret | \xa9763eec727835bd97b79018b308613268d9ea0db70493fd212771c9b7c3bcb2
code | 31620
Collector: Design
Without notifications, we would need to query the database periodically to check for any pending jobs (like the batch above). While this can be okay for high-traffic scenarios, if token insertions are rare, continuous polling just eats up resources. By listening for database events and counting them instead, we can build a collector that responds in near real-time, knows exactly how many rows to dequeue, and queries the database only when there's actual work to do.
On the flip side, the collector must also account for constraints imposed by our email provider—and, by extension, our budget. For instance, with Amazon SES charging $0.10 per 1,000 emails, a monthly budget of $100 translates to:
- 1,000,000 emails per month,
- 33,333 emails per day,
- 1,389 emails per hour,
- 23 emails per minute, and
- 0.38 emails per second.
At this rate, we would need to buffer for approximately 27 seconds for batches containing 10 destinations each:
10 emails / 0.38 emails per second ≈ 26.32 seconds
... which assumes we are operating at full capacity within our budget.
Pull-based Flow-control
To manage these considerations, the system implements a form of backpressure. It doesn't do so in a "push-back to the producer" sense that some streaming frameworks (like Kafka) might use, but rather by pulling tokens from the database in controlled batches and only fetching more when the collector is ready. In other words, the collector imposes a flow-control mechanism on itself to avoid overwhelming the downstream (email-sending) component.
Here's how it works in practice:
Batch Limit
The maximum number of email destinations in a single batch.
The collector queries the database for at most N tokens at a time (where N is the batch limit). Even if 500 tokens are waiting in the database, the collector will only take, say, 10 at a time. This imposes a hard cap on the throughput of tokens that can leave the database and head downstream.
Batch Timeout
The time to wait for accumulating enough notifications to fill a batch.
Instead of hammering the database every millisecond or immediately after a few notifications arrive, the collector also waits up to X milliseconds before processing tokens (where X is the batch timeout). If fewer than the batch limit have arrived during that window, the collector will still dequeue whatever did arrive—but it won't keep pulling more immediately. In effect, this ensures that surges of tokens are smoothed out over time (i.e., "batched").
Example
If you set:
- A batch timeout of 30 seconds.
- A limit of 10 notifications.
This means:
- If 10 notifications arrive in quick succession, the batch is triggered immediately.
- If fewer than 10 arrive over 30 seconds, the batch is triggered when the timeout ends.
In many cases, users are fine waiting a short time for an email to arrive. Personally, I wouldn't be too concerned if my password recovery email arrived within 30 seconds, though waiting a full minute might feel excessive. Note that this effectively sets an upper limit on how long new tokens can linger around before being handed over to the email sender, the batch timeout.
Collector: Implementation
The collector is written in C and interacts with PostgreSQL using libpq.
Connecting to PostgreSQL
The query from Part 1 lives in the db.c
file, along with other DB-related functions. When the collector first connects, it issues a LISTEN
command on the specified channel and creates the prepared statements for subsequent queries.
// Creates a prepared statement to be reused for efficient database queries.
static bool db_prepare_statement(PGconn *conn, const char *stmt_name, const char *query)
{
PGresult *res = PQprepare(conn, stmt_name, query, 2, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
PQclear(res);
return false;
}
PQclear(res);
return true;
}
// Executes a LISTEN command on a specified channel to receive database
// notifications in real time.
static bool db_listen(PGconn *conn, const char *channel)
{
char *escaped_channel = PQescapeIdentifier(conn, channel, strlen(channel));
if (!escaped_channel)
{
return false;
}
size_t command_len = strlen("LISTEN ") + strlen(escaped_channel) + 1;
char listen_command[command_len];
snprintf(listen_command, command_len, "LISTEN %s", escaped_channel);
PQfreemem(escaped_channel);
PGresult *res = PQexec(conn, listen_command);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
PQclear(res);
return false;
}
PQclear(res);
return true;
}
// Establishes a connection to the database, listens for notifications, and
// creates prepared statements.
bool db_connect(PGconn **conn, const char *conninfo, const char *channel)
{
*conn = PQconnectdb(conninfo);
return PQstatus(*conn) == CONNECTION_OK &&
db_listen(*conn, channel) &&
db_prepare_statement(*conn, POSTGRES_HEALTHCHECK_PREPARED_STMT_NAME, "SELECT 1") &&
db_prepare_statement(*conn, POSTGRES_DATA_PREPARED_STMT_NAME, token_data);
}
Fetching & Serializing Email Payloads
When notifications arrive, the collector executes the query to fetch tokens in batches. Then it writes the results directly to stdout. Processing continues until it has exhausted the queued tokens or an error occurs.
// Fetches and processes email payloads from the database.
static int _db_dequeue(PGconn *conn, const char *queue, int limit)
{
static const char *params[2];
static char limitstr[12];
PGresult *res = NULL;
int action_col, email_col, login_col, code_col, secret_col;
char *action, *email, *login, *code, *secret_text;
unsigned char *secret = NULL;
size_t secret_len;
int nrows;
static char signature_buffer[SIGNATURE_MAX_INPUT_SIZE]; // Input to sign
static unsigned char hmac_result[HMAC_RESULT_SIZE]; // HMAC output
static unsigned char combined_buffer[CONCATENATED_SIZE]; // secret + HMAC
static char base64_encoded[BASE64_ENCODED_SIZE]; // Base64-encoded output
size_t hmac_len = 0;
snprintf(limitstr, sizeof(limitstr), "%d", limit);
params[0] = queue;
params[1] = limitstr;
res = PQexecPrepared(conn, POSTGRES_DATA_PREPARED_STMT_NAME, 2, params, NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
log_printf("ERROR: query execution failed: %s", PQerrorMessage(conn));
PQclear(res);
return -1;
}
nrows = PQntuples(res);
if (nrows == 0)
{
PQclear(res);
return 0;
}
action_col = PQfnumber(res, "action");
email_col = PQfnumber(res, "email");
login_col = PQfnumber(res, "login");
code_col = PQfnumber(res, "code");
secret_col = PQfnumber(res, "secret");
if (action_col == -1 || email_col == -1 || login_col == -1 ||
code_col == -1 || secret_col == -1)
{
log_printf("FATAL: missing columns in the result set");
PQclear(res);
return -2;
}
size_t signature_len;
for (int i = 0; i < nrows; i++)
{
action = PQgetvalue(res, i, action_col);
email = PQgetvalue(res, i, email_col);
login = PQgetvalue(res, i, login_col);
code = PQgetvalue(res, i, code_col);
secret_text = PQgetvalue(res, i, secret_col);
secret = PQunescapeBytea((unsigned char *)secret_text, &secret_len);
if (!secret || secret_len != 32)
{
log_printf("WARN: skipping row; PQunescapeBytea failed or invalid secret length");
continue;
}
if (strcmp(action, "activation") == 0)
{
printf("%d", 1);
}
else if (strcmp(action, "password_recovery") == 0)
{
printf("%d", 2);
}
else
{
printf("%d", 0);
}
printf(",%s,%s,", email, login);
signature_len = construct_signature_data(signature_buffer, action, secret, code);
hmac_len = HMAC_RESULT_SIZE;
if (!hmac_sign(signature_buffer, signature_len, hmac_result, &hmac_len))
{
log_printf("WARN: skipping row; HMAC signing failed");
PQfreemem(secret);
continue;
}
memcpy(combined_buffer, secret, 32);
memcpy(combined_buffer + 32, hmac_result, hmac_len);
if (!base64_urlencode(base64_encoded, sizeof(base64_encoded), combined_buffer, 32 + hmac_len))
{
log_printf("WARN: skipping row; base64 encoding failed");
PQfreemem(secret);
continue;
}
printf("%s,%s", base64_encoded, code);
PQfreemem(secret);
if (i < nrows - 1)
{
printf(",");
}
}
printf("\n");
fflush(stdout);
PQclear(res);
return nrows;
}
// Ensure no more than the batch limit is dequeued.
int db_dequeue(PGconn *conn, const char *queue, int remaining, int max_chunk_size)
{
int result = 0;
int chunk_size = 0;
int total = 0;
while (remaining > 0)
{
chunk_size = remaining > max_chunk_size ? max_chunk_size : remaining;
result = _db_dequeue(conn, queue, chunk_size);
if (result < 0)
{
return result;
}
total += result;
remaining -= chunk_size;
sleep_microseconds(10000); // 10ms
}
return total;
}
Signing & Verifying Tokens
One key transformation during dequeue operation involves signing the token's secret
with HMAC-SHA256 and encoding it in Base64 URL-safe format.
The encoded output contains:
- A path name (e.g.,
/activate
or/recover
) - The original secret (and code, in the case of recovery)
- And the resulting cryptographic signature
static size_t construct_signature_data(char *output, const char *action,
const unsigned char *secret, const char *code)
{
size_t offset = 0;
if (strcmp(action, "activation") == 0)
{
memcpy(output, "/activate", 9); // "/activate" is 9 bytes
offset = 9;
memcpy(output + offset, secret, 32);
offset += 32;
}
else if (strcmp(action, "password_recovery") == 0)
{
memcpy(output, "/recover", 8); // "/recover" is 8 bytes
offset = 8;
memcpy(output + offset, secret, 32);
offset += 32;
memcpy(output + offset, code, 5); // code is 5 bytes
offset += 5;
}
return offset; // Total length of the constructed data
}
This step ensures authenticity checks can happen on the frontend without needing an immediate database call. If you'd like to see how the backend verifies these secrets, there is a verifyHmac.js
script in the repo for reference.
Remember to handle expired tokens. One method is to include the
expires_at
value so you can check validity without a DB lookup. But if tokens remain valid for 15 minutes, a more thorough approach is to cache consumed tokens until they naturally expire, preventing reuse during their validity period.
Also, remember to rotate your signing key often.
Putting It All Together
Environment Variables
In main.c
, you'll see references to environment variables like MAILROOM_BATCH_TIMEOUT
, MAILROOM_BATCH_LIMIT
, and MAILROOM_SECRET_KEY
(a 64-character hex string). Consult the README
file for the full list.
The Loop
At a high level, the main loop repeatedly:
- Dequeues and processes ready batches
- Checks for new notifications
- Waits on
select()
for either database activity or a timeout - Performs periodic health checks
- Reconnects to the database if needed
When the batch limit is reached or the timeout occurs, the collector executes the dequeue query. If it detects a broken connection, it attempts to reconnect and resumes once stable.
Here's the pseudo-code representation:
// 🌟 Main processing loop
WHILE the application is running 🔄
// 🔌 Handle reconnection if needed
IF the connection is not ready ❌ THEN
reconnect to the database 🔄
initialize the connection ✅
reset counters 🔢
CONTINUE to the next iteration ⏩
END IF
// 📦 Process ready batches
IF ready for processing ✅ THEN
dequeue and process a batch of items 📤
reset state for the next cycle 🔁
CONTINUE to the next iteration ⏩
END IF
// 🛎️ Handle pending notifications
process all incoming notifications 📥
IF notifications exceed the batch limit 🚨 THEN
mark ready for processing ✅
CONTINUE to the next iteration ⏩
END IF
// ⏱️ Wait for new events or timeout
wait for activity on the connection 📡 or timeout ⌛
IF interrupted by a signal 🚨 THEN
handle the signal (e.g., shutdown) ❌
CONTINUE to the next iteration ⏩
ELSE IF timeout occurs ⏳ THEN
IF notifications exist 📋 THEN
mark ready for processing ✅
CONTINUE to the next iteration ⏩
END IF
perform periodic health checks 🩺
END IF
// 🛠️ Consume available data
consume data from the connection 📶
prepare for the next cycle 🔁
END WHILE
And here's the actual implementation:
int result;
PGconn *conn = NULL;
fd_set active_fds, read_fds;
int sock;
struct timeval tv;
int seen = 0;
PGnotify *notify = NULL;
int rc = 0;
long start = get_current_time_ms();
long now, elapsed, remaining_ms;
long last_healthcheck = start;
int ready = -1;
while (running)
{
if (ready < 0)
{
if (conn)
{
PQfinish(conn);
}
if (!db_connect(&conn, conninfo, channel_name))
{
log_printf("ERROR: connection failed: %s", PQerrorMessage(conn));
return exit_code(conn, EXIT_FAILURE);
}
log_printf("connected");
while (running && (result = db_dequeue(conn, queue_name, batch_limit, batch_limit)) == batch_limit)
;
if (result < 0)
{
return exit_code(conn, EXIT_FAILURE);
}
FD_ZERO(&active_fds);
sock = PQsocket(conn);
FD_SET(sock, &active_fds);
seen = 0;
ready = 0;
last_healthcheck = get_current_time_ms();
continue;
}
else if (ready > 0)
{
result = db_dequeue(conn, queue_name, seen, batch_limit);
if (result == -2)
{
return exit_code(conn, EXIT_FAILURE);
}
else if (result == -1)
{
log_printf("WARN: forcing reconnect...");
ready = -1;
continue;
}
else if (result != seen)
{
log_printf("WARN: expected %d items to be processed, got %d", seen, result);
}
seen = 0;
ready = 0;
last_healthcheck = get_current_time_ms();
}
// Process any pending notifications before select()
while (running && (notify = PQnotifies(conn)) != NULL)
{
PQfreemem(notify);
if (seen == 0)
{
log_printf("NOTIFY called; waking up");
start = get_current_time_ms(); // Received first notification; reset timer
}
seen++;
PQconsumeInput(conn);
}
if (seen >= batch_limit)
{
log_printf("processing %d rows... (max reached)", seen);
ready = 1;
continue; // Skip select() and process immediately
}
now = get_current_time_ms();
elapsed = now - start;
remaining_ms = timeout_ms - elapsed;
if (remaining_ms < 0)
{
remaining_ms = 0;
}
tv.tv_sec = remaining_ms / 1000;
tv.tv_usec = (remaining_ms % 1000) * 1000;
read_fds = active_fds;
rc = select(sock + 1, &read_fds, NULL, NULL, &tv);
if (rc < 0)
{
if (errno == EINTR)
{
if (!running)
{
break;
}
log_printf("WARN: select interrupted by signal");
continue;
}
log_printf("ERROR: select failed: %s (socket=%d)", strerror(errno), sock);
break;
}
else if (rc == 0)
{ // Timeout occurred;
start = get_current_time_ms(); // Reset the timer
if (seen > 0)
{
log_printf("processing %d rows... (timeout)", seen);
ready = 1;
continue;
}
if ((sock = PQsocket(conn)) < 0)
{
log_printf("WARN: socket closed; %s", PQerrorMessage(conn));
ready = -1;
continue;
}
if (now - last_healthcheck >= healthcheck_ms)
{
if (!db_healthcheck(conn))
{
ready = -1;
continue;
}
else
{
last_healthcheck = start;
}
}
}
if (!FD_ISSET(sock, &read_fds))
{
continue;
}
do
{
if (!PQconsumeInput(conn))
{
log_printf("WARN: error consuming input: %s", PQerrorMessage(conn));
if (PQstatus(conn) != CONNECTION_OK)
{
ready = -1;
break;
}
}
} while (running && PQisBusy(conn));
}
select()
in a Nutshell
The select()
system call is central to how the program operates. It's a UNIX mechanism that monitors file descriptors (like sockets) to determine if they're ready for I/O operations (e.g., reading or writing).
In this code, select()
is used to:
- Monitor the socket for new notifications
- Enforce a timeout for batch processing
PQconsumeInput
and PQnotifies
After select()
signals that data is ready, the collector calls PQconsumeInput
, which reads data into libpq's internal buffers. It then calls PQnotifies
to retrieve any pending notifications and increment the counter.
Read more about libpq's async API.
Compile & Run
To compile, verify that you have openssl@3
and libpq@5
installed, then use the provided Makefile
.
Run the Collector
Use the following command to build and run the collector
executable with example configuration variables:
make && \
MAILROOM_BATCH_LIMIT=3 \
MAILROOM_BATCH_TIMEOUT=5000 \
MAILROOM_DATABASE_URL="dbname=mailroom" \
MAILROOM_SECRET_KEY="cafebabecafebabecafebabecafebabecafebabecafebabecafebabecafebabe" \
./collector
Once configured and started, it will log its activity:
2024/04/20 13:37:00 [PG] configured; channel=token_insert queue=mailroom limit=3 timeout=5000ms healthcheck-interval=270000ms
2024/04/20 13:37:00 [PG] connecting to host=/tmp port=5432 dbname=mailroom user=ogu sslmode=disable
2024/04/20 13:37:00 [PG] connected
Insert Accounts and Observe Batching
In another terminal, insert 5 accounts:
printf "%.0sINSERT INTO accounts (email, login) VALUES ('user' || md5(random()::text) || '@fake.mail', 'user' || substr(md5(random()::text), 1, 20));\n" {1..5} | \
psql -d mailroom
You'll observe the collector
immediately process the first batch of three items. After a 5-second delay (as defined by the MAILROOM_BATCH_TIMEOUT
), it processes the remaining two in a second batch:
2024/04/20 13:37:00 [PG] NOTIFY called; waking up
2024/04/20 13:37:00 [PG] processing 3 rows... (max reached)
1,userb183abb7a25d04027061e6b8d8d8e7fa@fake.mail,userb0bf075b82b892f53d97,gVRNesi-opSvs3ntPfr9DzSn_JwbOD04VVIurQSCOFzzd3BOM3WBDL3SOtDjMxKLd6csSn8_p9hemXHIUxIjPg,78092,1,user43b01ba9686c886473e526429dd2c672@fake.mail,userf420078dba4fd5a91de2,--DTy5LsbDeLP_AweXIPSjL3_avQMT5cH_bRxPy1uxQLVhXKaw7Oxd7NYkcJ6MZmnnqWqTcBPHA5z7bqunXEAA,25778,1,user46f81dfd34b91a1904ac4524193575aa@fake.mail,user6d91baab56d2823b326d,ryooWewe3OTxIGF1Gjl5Vvl8BsXoqWVbCAt1t6J--_KX1SM4DbyCes4yn75OWVe60G4MMZdv4byRh1wy-Clvxw,78202
2024/04/20 13:37:00 [PG] NOTIFY called; waking up
2024/04/20 13:37:05 [PG] processing 2 rows... (timeout)
1,user12d2722e1c07b0a531ea69ae125d4697@fake.mail,user853ae29eefc5d44a6bc6,4pmew2o2EOAZBDHWvJBcixJftpRCb8uyXZhzN12EOcrLBmzc4ic9avwd9dla09pIiKIoqW5iIwMfoXLEM3_LGw,38806,1,user9497d0e033019fcf3198eecb053ba40e@fake.mail,userfcde338dba96cc419613,ANLMa-1y37VLCDqK0wnfEFhUVzHsWpaNGV2ttI8m3o6_lbbYOKmp3hP7Q8H8ZQRNMPAj4xsSqC26nesfVZLgzQ,89897
Testing Reconnect Behavior
To simulate a dropped connection, open another terminal, connect to mailroom
via psql
, and run:
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE
datname = 'mailroom'
AND pid <> pg_backend_pid();
After the connection is killed, select()
wakes up, causing PQconsumeInput()
to fail with an error. The collector logs a reconnect attempt, and once reconnected, it resumes processing without losing track of queued tokens during the downtime.
2024/04/20 13:37:42 [PG] WARN: error consuming input: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
2024/04/20 13:37:42 [PG] connecting to host=/tmp port=5432 dbname=mailroom user=ogu sslmode=disable
2024/04/20 13:37:42 [PG] connected
Further Improvements
It's important to note that the mailroom system we are building is relatively basic. Industry-grade event streaming solutions often employ more sophisticated batching strategies.
Priority Queues
Our current PostgreSQL-based queueing mechanism processes tokens without distinguishing between their types and lacks the ability to prioritize critical ones, such as password recovery, over less urgent emails like account activations. At present, '10 emails per second' could mean 10 emails of the same type or a mix, depending on the batch. While effective, this design leaves room for improvement, such as introducing prioritization or smarter batching strategies.
Adaptive Batching
User activity is rarely consistent. There are likely to be bursts of high activity, far exceeding our daily/hourly quotas, interspersed with periods of minimal activity.
Rather than using fixed limits and timeouts, we can adjust batch size and timeout values dynamically in response to real-time conditions. During low-traffic periods, the batch size can be increased to improve efficiency. During peak hours, it can be reduced to minimize delays. We could even incorporate machine learning to automate these adjustments.
Next
In the upcoming Part 3, we'll build the sender component to handle email delivery with rate-limiting.