Java消息隊列任務的平滑關閉

1.問題背景

對于消息隊列的監聽,我們一般使用Java寫一個獨立的程序,在Linux服務器上運行。程序啟動后,通過消息隊列客戶端接收消息,放入一個線程池進行異步處理,并發的快速處理。

那么問題來了,當我們修改程序后,需要重新啟動任務的時候,如何保證消息的不丟失呢?

正常來說,訂閱者程序關閉后,消息會在發送者隊列中堆積,等待訂閱者下次訂閱消費,所以未接收的消息是不會丟失的。唯一可能丟失的消息,就是在關閉的一瞬間,已經從隊列中取出但還沒有處理完畢的消息。

因此我們需要一套平滑關閉的機制,保證在重啟的時候,消息可以正常處理完成。

2.問題分析

平滑關閉的思路如下:

  1. 在關閉程序時,首先關閉消息訂閱,這個時候消息都在發送者隊列中

  2. 關閉本地消息處理線程池(等待本地線程池中的消息處理完畢)

  3. 程序退出

關閉消息訂閱:一般消息隊列的客戶端都提供關閉連接的方法,具體可以自行查看api

關閉線程池:Java的ThreadPoolExecutor線程池提供shutdown()shutdownNow()兩個方法,區別是前者會等待線程池中的消息都處理完畢,后者直接停止線程的執行并返回list集合。因為我們需要使用shutdown()方法進行關閉,并通過isTerminated(),方法判斷線程池是否已經關閉.

那么問題又來了,我們如何通知到程序,需要執行關閉操作呢?

在Linux中,我們可以用kill -9 pid關閉進程,除了-9之外,我們可以通過 kill -l查看kill 命令的其它信號量,比如使用 12) SIGUSR2 信號量

我們可以在Java程序啟動時,注冊對應的信號量,對信號量進行監聽,在收到對應的kill操作時,執行相關的業務操作。

偽代碼如下

 //注冊linux kill信號量  kill -12Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {    @Override
    public void handle(Signal signal) {        //關閉訂閱者
        //關閉線程池
        //退出
    }
});

下面通過一個demo模擬相關邏輯操作

首先模擬一個生產者,每秒生產5個消息

然后模擬一個訂閱者,收到消息后交給線程池進行處理,線程池固定4個線程,每個消息處理時間1秒,這樣線程池每秒會積壓1個消息。

package com.lujianing.demo;import sun.misc.Signal;import sun.misc.SignalHandler;import java.util.concurrent.*;/**
 * @author [email protected]
 * @Description:
 * @date 2016/11/14
 */public class MsgClient {    //模擬消息隊列訂閱者 同時4個線程處理
    private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);    //模擬消息隊列生產者
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();    //用于判斷是否關閉訂閱
    private static volatile boolean isClose = false;    public static void main(String[] args) throws InterruptedException {
        BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
        producer(queue);
        consumer(queue);
    }    //模擬消息隊列生產者
    private static void producer(final BlockingQueue  queue){        //每200毫秒向隊列中放入一個消息
        SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {            public void run() {
                queue.offer("");
            }
        }, 0L, 200L, TimeUnit.MILLISECONDS);
    }    //模擬消息隊列消費者 生產者每秒生產5個   消費者4個線程消費1個1秒  每秒積壓1個
    private static void consumer(final BlockingQueue queue) throws InterruptedException {        while (!isClose){
            getPoolBacklogSize();            //從隊列中拿到消息
            final String msg = (String)queue.take();            //放入線程池處理
            if(!THREAD_POOL.isShutdown()) {
                THREAD_POOL.execute(new Runnable() {                    public void run() {                        try {                            //System.out.println(msg);
                            TimeUnit.MILLISECONDS.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }    //查看線程池堆積消息個數
    private static long getPoolBacklogSize(){        long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
        System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));        return backlog;
    }    static {
        String osName = System.getProperty("os.name").toLowerCase();        if(osName != null && osName.indexOf("window") == -1) {            //注冊linux kill信號量  kill -12
            Signal sig = new Signal("USR2");
            Signal.handle(sig, new SignalHandler() {                @Override
                public void handle(Signal signal) {
                    System.out.println("收到kill消息,執行關閉操作");                    //關閉訂閱消費
                    isClose = true;                    //關閉線程池,等待線程池積壓消息處理
                    THREAD_POOL.shutdown();                    //判斷線程池是否關閉
                    while (!THREAD_POOL.isTerminated()) {                        try {                            //每200毫秒 判斷線程池積壓數量
                            getPoolBacklogSize();
                            TimeUnit.MILLISECONDS.sleep(200L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("訂閱者關閉,線程池處理完畢");
                    System.exit(0);
                }
            });
        }
    }
}

當我們在服務上運行時,通過控制臺可以看到相關的輸出信息,demo中輸出了線程池的積壓消息個數

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient

輸入圖片說明

另打開一個終端,通過ps命令查看進程號,或者通過nohup啟動Java進程拿到進程id

ps -fe|grep MsgClient

輸入圖片說明

當我們執行kill -12 pid的時候 可以看到關閉業務邏輯

平滑關閉

3.問題總結

在部門的實際業務中,消息隊列的消息量還是挺大的,某些業務高峰時每秒有幾百的消息量,因此對消息的處理要保證速度,避免消息積壓,也可以通過負載解決單個訂閱節點的壓力。

在某些業務場景中,對消息的完整性要求不那么高,那么就不用考慮重啟時的一點損耗。反之,就需要好好思考和設計了。

來源:開源中國

上一篇: 如何用四個月搞定Java?

下一篇: Java開發者需要了解哪些常見的開移動開發編程語言排行榜Top 6

分享到: 更多
鱼丸游戏飞禽走兽 即时比分大赢家体育 好运来幸运飞艇计划 欢乐生肖计划免费版 不倍投稳赚方案 三分时时计划 重庆老时时开奖 双面盘平台 ag电子查询注单号 pk10全天二期计划在线 老北京pk赛车计划软件 七月棋牌 七星彩开奖视频直播现场 天天棋牌送20元 重庆时时玩法破解 nba篮球投注算不算加时赛