Building a Resilient Node.js Consumer for Alibaba Cloud RocketMQ


In our previous section “Implementing a Resilient Node.js Producer for Alibaba Cloud RocketMQ“, we built the edge-side Producer that catches offline-synced data and securely buffers it into Alibaba Cloud RocketMQ. Now, we need to build the central cloud’s engine: the Consumer.

When the international gateways reopen and connectivity is restored, your RocketMQ topics will be flooded with delayed messages from edge nodes around the world. If your backend indiscriminately writes all these messages to your database, you risk data corruption, duplicate entries, and database exhaustion.

The Consumer script sits securely in your central Alibaba Cloud VPC. Its job is to read these messages at a controlled pace, verify that the data hasn’t already been processed (idempotency), and securely write the final state to your PolarDB database.


The Core Concept: Idempotency at Scale


In distributed systems, especially those recovering from network failures, at-least-once delivery is the standard. This means RocketMQ guarantees your message will be delivered, but in rare network-flicker scenarios, it might deliver the same message twice.

Your Consumer must be idempotent. This means that no matter how many times it processes the same MessageKey (the local ID generated by the offline device), the end result in the database remains exactly the same, without throwing duplicate key errors or corrupting patient records.


Prerequisites for the Central Cloud


Before deploying this worker to an Alibaba Cloud ECS instance or Serverless App Engine (SAE), ensure you have:

  1. Node.js Environment: Configured in your central VPC.
  2. PolarDB Instance: A running Alibaba Cloud PolarDB cluster (MySQL-compatible) with a database and table ready to accept data.
  3. Required npm packages: npm install @alicloud/mq-http-sdk mysql2

The Node.js Consumer Script

This worker script continuously polls RocketMQ for new messages, checks PolarDB to see if the record already exists, and inserts the data if it is genuinely new.

const { MQClient } = require('@alicloud/mq-http-sdk');
const mysql = require('mysql2/promise');

// ==========================================
// 1. Configuration & Initializations
// ==========================================
// RocketMQ Config
const mqEndpoint = process.env.ROCKETMQ_ENDPOINT;
const accessKeyId = process.env.ALIBABA_ACCESS_KEY;
const accessKeySecret = process.env.ALIBABA_SECRET_KEY;
const instanceId = process.env.ROCKETMQ_INSTANCE_ID;
const topicName = process.env.ROCKETMQ_TOPIC;
const groupId = process.env.ROCKETMQ_GROUP_ID; // Consumer Group ID

// PolarDB (MySQL) Config
const dbConfig = {
  host: process.env.POLARDB_ENDPOINT,
  user: process.env.POLARDB_USER,
  password: process.env.POLARDB_PASSWORD,
  database: 'healthcare_db',
  waitForConnections: true,
  connectionLimit: 10, // Prevent overwhelming the database
  queueLimit: 0
};

// Initialize Clients
const mqClient = new MQClient(mqEndpoint, accessKeyId, accessKeySecret);
const consumer = mqClient.getConsumer(instanceId, topicName, groupId, 'SyncEvent'); // Filtering by Tag
const dbPool = mysql.createPool(dbConfig);

// ==========================================
// 2. The Consumer Worker Function
// ==========================================
async function startWorker() {
  console.log('[Consumer] Worker started. Polling RocketMQ...');

  while (true) {
    try {
      // Poll up to 3 messages at a time, wait up to 3 seconds if queue is empty
      const messages = await consumer.consumeMessage(3, 3);

      if (messages.length === 0) {
        continue; // No messages, loop again
      }

      for (const msg of messages) {
        const localRecordId = msg.MessageKey;
        const payload = JSON.parse(msg.MessageBodyString);
        
        console.log(`[Consumer] Processing Message: ${msg.MessageId} | Key: ${localRecordId}`);

        // --- IDEMPOTENCY CHECK ---
        // Check if this localRecordId already exists in PolarDB
        const [rows] = await dbPool.execute(
          'SELECT id FROM patient_records WHERE local_record_id = ?', 
          [localRecordId]
        );

        if (rows.length > 0) {
          console.log(`[Consumer] Duplicate detected for Key: ${localRecordId}. Ignoring payload.`);
        } else {
          // --- DATABASE INSERTION ---
          // Record is new, insert it into PolarDB safely using parameterized queries
          await dbPool.execute(
            `INSERT INTO patient_records (local_record_id, patient_id, notes, device_timestamp) 
             VALUES (?, ?, ?, ?)`,
            [
              localRecordId, 
              payload.patientData.patientId, 
              payload.patientData.notes, 
              new Date(payload.deviceTimestamp)
            ]
          );
          console.log(`[Consumer] Successfully inserted new record for Key: ${localRecordId}`);
        }

        // --- ACKNOWLEDGMENT ---
        // Crucial: Tell RocketMQ to delete the message so it isn't processed again
        const ackResult = await consumer.ackMessage([msg.ReceiptHandle]);
        if (ackResult.code !== 204) {
             console.warn(`[Consumer] Failed to acknowledge message: ${msg.MessageId}`);
        }
      }

    } catch (error) {
      // If the error is 'MessageNotExist', it just means the queue is empty. Ignore it.
      if (error.message && error.message.includes('MessageNotExist')) {
        continue;
      }
      
      // For actual database or processing errors, log them. 
      // Because we DID NOT acknowledge the message, RocketMQ will automatically 
      // retry delivering it later based on your retry policy.
      console.error('[Consumer] Error processing messages:', error);
      
      // Optional: Add a brief sleep to prevent tight error loops
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  }
}

// Start the continuous polling
startWorker();

Why This Architecture Will Not Break


1. Connection Pooling with PolarDB

During a reconnection event, you might have thousands of messages arriving per second. If your Node.js script tried to open a new database connection for every single message, your PolarDB instance would quickly run out of available connections and crash. By using mysql.createPool({ connectionLimit: 10 }), the consumer strictly controls the flow of traffic to the database. It acts as a dam, letting data trickle into PolarDB at a highly sustainable rate.


2. The Idempotency Check

By actively querying the local_record_id before performing the INSERT, we neutralize the threat of duplicate data. Even if RocketMQ delivers the exact same message three times due to network instability, only the very first one will result in a database write. The subsequent two will be acknowledged and safely discarded.


3. Separation of Processing and Acknowledgment

Notice that consumer.ackMessage happens after the PolarDB insert is fully complete. If the PolarDB database temporarily goes offline, the INSERT statement will fail and throw an error. The script will jump to the catch block, skipping the acknowledgment. Because RocketMQ never received an “ACK,” it assumes the message failed and will automatically put it back in the queue to be retried later. This guarantees zero data loss.

You now have a complete, end-to-end understanding of how to build an offline-first, highly resilient architecture using Alibaba Cloud ENS, RocketMQ, and PolarDB.

Now setup Alibaba Cloud Log Service (SLS) for Real-Time Synchronization Monitoring

Leave a Comment