• 
    

    
    

      99热精品在线国产_美女午夜性视频免费_国产精品国产高清国产av_av欧美777_自拍偷自拍亚洲精品老妇_亚洲熟女精品中文字幕_www日本黄色视频网_国产精品野战在线观看 ?

      采用ScheduledThreadPoolExecutor執(zhí)行定時重試任務時內存溢出的分析及解決

      2016-05-14 11:20:05余志堅姜春志
      科技資訊 2016年7期
      關鍵詞:入隊線程調用

      余志堅 姜春志

      摘 要:開發(fā)JavaWeb項目中發(fā)現(xiàn)服務之間的調用存在超時情況,由于涉及的處理邏輯全部是異步,引入定時重試的機制,重試工具選擇了JDK自帶的ScheduledThreadPoolExecutor。當A服務依賴B服務,B服務由于在業(yè)務高峰期處理能力降低,導致大量A服務過來的請求超時,A加入了超時重試機制,間隔時間根據(jù)重試次數(shù)的多少來決定,次數(shù)越多,兩次重試之間間隔的時間越多,此時的業(yè)務高峰也會給A帶來大量請求,大量的超時會導致重試隊列迅速堆積,直到內存溢出。該文從線程池工作機制、ScheduledThreadPoolExecutor實例的創(chuàng)建,獲取重試任務的過程以及提交任務的過程角度分析,并通過源代碼的剖析和測試工具MyEclipse進行演示測試內存泄露的情況,得出避免內存泄露的解決方案。

      關鍵詞:ScheduledThreadPoolExecutor 線程池 內存溢出

      中圖分類號:TP3 文獻標識碼:A 文章編號:1672-3791(2016)03(a)-0015-03

      1 ScheduledThreadPoolExecutor實例的創(chuàng)建過程及線程池工作機制

      1.1 ScheduledThreadPoolExecutor實例的創(chuàng)建過程

      重試工具選擇了JDK自帶的ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor實例的創(chuàng)建過程如下:ScheduledThreadPoolExecutor實例的創(chuàng)建過程如下:(1)獲取當前機器上處理器的數(shù)量;(2)使用Google的ThreadFactoryBuiler創(chuàng)建指定格式名稱的線程,以方便查看問題;(3)有需要被拒絕的任務時,拋出異常;(4)創(chuàng)建定時任務池;打開MyEclipse工具顯示相對的代碼:int corePoolSize=Runtime.getRuntime().availableProcessors();

      ThreadFactory tf=new ThreadFactoryBuilder().setNameFormat("FailureRetryTask-pool-%d").build();

      RejectedExecutionHandler handler=new ThreadPoolExecutor.AbortPolicy();

      ScheduledThreadPoolExecutor taskService=new ScheduletThreadPooExecutor(corePoolSize,tf,handler);

      線程池就是多個線程在一個隊列中取任務執(zhí)行,提交的任務會被放入隊列中等待線程執(zhí)行,故隊列要設置一個大小。線程池同樣會根據(jù)任務繁忙程度來動態(tài)調整連接數(shù),空閑時保持最小連接數(shù),繁忙時增加連接,但不會超過上限,具有伸縮性,線程的創(chuàng)建和銷毀也需要消耗系統(tǒng)資源,線程的連接重用就可以避免這部分損失,具有重用性。

      1.2 線程池工作機制

      線程獲取任務的策略就是如果當前線程池運行狀態(tài)正常,則阻塞等待任務,否則直接返回或等待有限時間后返回。線程池中線程的主要任務就是獲取任務,然后執(zhí)行,然后再去獲取任務,如此循環(huán),這就實現(xiàn)了線程池中線程的可重用。

      Worker封裝了任務,同時創(chuàng)建了新的線程,并被添加到集合workers中,這個workers其實就是最核心的線程池。通過run方法實現(xiàn)重用。private final HashSet workers=new HashSet();

      public void run(){

      try{

      Runnable task=firstTask;

      firstTask=null;

      while(task!=null||(task=getTask())!=null){

      runTask(task);

      task=null;}}

      finally{

      workerDone(this);

      }

      }

      Runnable getTask(){

      for(;;){

      try{

      int state=runState;

      if(state>SHUTDOWN){return null;}

      Runnable r;

      if(state==SHUTDOWN){r=workQueue.poll();}

      else if(poolSize>corePoolSize||allowCoreThreadTimeOut){

      r=workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);

      }else{r=workQueue.take();}

      if(r!=null){return r;}

      if(workerCanExit()){

      if(runState>=SHUTDOWN){interruptIdleWorkers();}

      return null;

      }

      }

      catch(InterruptedException ie){}

      }

      }

      private boolean workerCanExit(){

      final RenntrantLock mainLock=this.mainLock;

      mainLock.lock();

      boolean canExit; try{canExit=runState>=STOP||workQueue.isEmpty()||(allowCoreThreadTimeOut&&poolSize>Math.max(1,corePoolSize));

      }finally{mainLock.unLock();}

      return canExit;

      }

      如果此時線程池運行狀態(tài)是終止(runState >= STOP),或者隊列為空,或者允許核心線程超時并且線程池中線程數(shù)量大于最小線程數(shù)量,那么方法將返回true。再回到getTask方法,調用workerCanExit方法的前提是沒有獲取到任務,根據(jù)上邊獲取任務的過程,這幾個條件都有可能成立,所以此時getTask方法可以返回null,上層Worker的run方法從while循環(huán)重返回,整個線程結束,這就實現(xiàn)了線程池的可伸縮。

      2 ScheduledThreadPoolExecutor獲取任務的過程

      在getTask()中,描述了整個獲取任務的過程,如果線程池運行狀態(tài)已經(jīng)是SHUTDOWN了,調用非阻塞方法poll,因為如果當前有任務,那么可以獲取到任務并返回,如果沒有任務,也沒有必要阻塞在隊列上等待任務,因為已經(jīng)SHUTDOWN,后續(xù)不會再有任務進入。

      如果當前線程數(shù)大于最小線程數(shù),或者核心線程也可以做超時處理,意味著如果獲取不到任務就可以銷毀一部分線程了,所以poll方法設置了等待時間,超時后立即返回。

      另一種情況是線程池還在運行狀態(tài),并且當前線程數(shù)不大于最小線程數(shù),同時也不允許最小線程數(shù)以內的線程超時,這個時候線程就要調用阻塞方法take,等待任務進入隊列以后才返回。

      3 ScheduledThreadPoolExecutor提交任務的執(zhí)行過程

      ScheduledThreadPoolExecutor提交任務的執(zhí)行過程,首先提交任務:taskService .schedule(new Runnable(){public void run(){}},1,TimeUnit,DAYS);

      ScheduledThreadPoolExecutor通過schedule方法提交定時任務,schedule方法源碼如下:

      public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit){

      if(command==null||unit==null){throw new NullPointerException();}

      if(delay<0){delay=0;}

      RunnableScheduledFuture<?> t=decorateTask(command,new ScheduledFutureTask(command,null,triggerTime));

      delayedExecute(t);

      return t;

      }

      提交的任務會被封裝成ScheduledFutureTask類型對象。

      分析delayedExecute方法:private void delayedExecute(Runnable command){

      if(isShutDown()){reject(command);return;}

      if(getPoolSize()

      super.getQueue().add(command);

      }

      如果線程的運行狀態(tài)不是RUNNNING或者入隊列沒有成功,則采用線程池的構造方法中設置的拒絕策略來處理任務。

      如果當前線程池中的線程數(shù)量poolSize小于線程池核心線程的數(shù)量corePoolSize,執(zhí)行prestartCoreThread(),該方法會創(chuàng)建一個新的線程來執(zhí)行任務,如果prestartCoreThread()創(chuàng)建的新線程執(zhí)行任務失敗或者當前線程池中的線程數(shù)量poolSize大于等于線程池核心線程數(shù)量corePoolSize,當若線程池的運行狀態(tài)是RUNNING并且入隊成功,由于在多線程環(huán)境下,狀態(tài)隨時可能會改變,此時線程池的運行狀態(tài)runState不是RUNNING或者線程池中沒有可用的線程(poolSize==0),要確保進入的任務被執(zhí)行處理,線程池在初始化完成以后是空的,并沒有線程,如果在服務器中使用線程池,服務重啟后有大量請求進入,則要同時創(chuàng)建多個線程,而且創(chuàng)建過程是加鎖同步的,會導致一定的競爭,解決辦法就是線程池初始化時調用prestartAllCoreThreads方法啟動核心線程數(shù)量的線程,這樣就能在線程池中的線程就緒以后才開始接收請求。

      通過getQueue方法獲取任務隊列,并且調用add方法向隊列中添加任務,dq的定義:

      private final DelayQueue dq=new DelayQueue();

      public boolean add(Runnable x){return dq.add((RunnableScheduleFuture)x);}

      可以看出dq是阻塞隊列,線程池中的線程都是在隊列中取數(shù)據(jù),ScheduledThreadPoolExecutor中的構造方法里的隊列的實現(xiàn)使用鏈表結構的阻塞隊列,add方法內部調用offer方法,offer源碼如下:public boolean offer(E e){

      final ReentrantLock lock=this.lock();

      lock.lock();

      try{

      E first=q.peek();

      q.offer(e);

      if(first==null||e.compareTo(first)<0){

      available.singalAll();

      return true; }

      }finally{lock.unlock();}}

      這方法需要在多線程環(huán)境下同步執(zhí)行,會用到鎖Lock。鎖實現(xiàn)的大概原理如下。

      Lock實現(xiàn)鎖的方式是通過排他性更新共享變量,更新成功的線程繼續(xù)執(zhí)行,沒有更新成功的線程將會被阻塞。Lock的共享變量state在可重入鎖中可以用來表示一個線程調用了幾次lock方法,也就是有幾次獲取鎖的行為。Lock的功能實現(xiàn)是通過內部聚合了抽象隊列同步器(AQS),同步器有公平和非公平之分。非公平同步器對于新來的線程會嘗試獲取,不成功以后才會進入等待隊列,而公平同步器則會首先判斷是否排隊。AQS中會保存獲取鎖的當先線程的引用。如果一次性嘗試獲取鎖不成功,則線程會進入隊列,循環(huán)嘗試獲取鎖。

      peek方法會獲取隊列的第一個元素,只是獲取,并沒有出隊列。接著調用優(yōu)先級隊列PriorityQueue類型變量q的offer方法將隊列入隊,優(yōu)先級隊列會對任務進行排序,距離執(zhí)行時間越近,位置越靠前。下邊的if判斷可以這樣理解,first是在當前任務入隊之前獲取的,也就是隊列中原有的第一個任務,compareTo的這段比較是說當前任務的執(zhí)行時間比隊列中第一個任務執(zhí)行時間還要早,如果first是null,那么當前任務入隊后將是第一個元素,如果當前任務的執(zhí)行時間比隊列中第一個任務的執(zhí)行時間早,那么當前入隊后也將是第一個元素,只要這兩個條件有一個成立了,這個if的判斷條件就為true,就要執(zhí)行Condition類型的available變量的signalAll方法,喚醒等待的線程工作。

      4 隊列的大小判斷

      隊列的大小是決定內存溢出最直觀的因素,首先來看看優(yōu)先級隊列PriorityQueue的offer方法:public boolean offer(E e){

      if(e==null){throw new NullPointerException();}

      modCount++;

      int i=size;

      if(i>=queue.length){grow(i+1);}

      size=i+1;

      if(i==0){queue[0]=e;}

      else{siftUp(i,e);}

      return true;

      上述代碼表示如果隊列中元素的個數(shù)(size)大于等于隊列的長度,將要通過grow方法擴容,如下:private void grow(int minCapacity){

      if(minCapacity<0){throw new OutOfMemoryError();}

      int oldCapacity=queue.length;

      int newCapacity=((oldCapacity<64)?((oldCapacity+1)*2):((oldCapacity/2)*3));

      if(newCapacity<0){newCapacity=Integer.MAX_VALUE;}

      if(newCapacity

      queue=Arrays.copyof(queue,newCapacity);

      }

      若隊列容量小于64,那就在原有基礎上加1然后擴大2倍,這種情況絕對不會造成內存的溢出問題。如果大于等于64呢?直接擴容一半,然后將值賦給一個int型變量,當某種情況如果超過int類型的最大值了,JDK的處理是賦值成Integer的MAX_VALUE為2147483647,也就是最大的隊列長度是2 G多,如果一個對象的大小按照50個字節(jié)來算,將會占用100 G的內存必定溢出。

      5 模擬內存溢出代碼測試

      當業(yè)務高峰給服務器帶來大量請求,大量的超時會導致重試隊列迅速堆積,直到內存溢出,下面就通過代碼來測試一下:模擬大量的添加任務,并且任務在調度隊列中堆積,推遲一天執(zhí)行。

      while(true){

      taskService.schedule(new Runnable(){public void run(){}},1,TimeUnit.DAYS);

      }

      虛擬機啟動參數(shù)-:

      -Xms32M -Xmx32M -Xmn10M-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=d:/

      運行輸出:

      java.lang.OutOfMemoryError:Java heap space Dumping head to d:/\java_pid12884.hprof...

      Heap dump file created [44940425 bytes in 0.618 secs]

      內存溢出了,來看看內存快照(見表1)。

      6 解決方案及措施

      編譯好的java程序需要運行在JVM中,而JVM為程序提供并管理所需要的內存空間,JVM自帶一個用于回收沒有任何引用指向的對象的線程機制(垃圾回收器),但針對于ScheduledThreadPoolExecutor提交的任務會被封裝成ScheduledFutureTask類型對象且每個對象中又有Sync成員變量。解決的辦法可以是手動判斷隊列的大小,通過taskService.getQueue().size()方法,通過Jmap內存分析工具估算每個對象的大小,Jmap是一個可以輸出所有內存中對象的工具,甚至可以將JVM 中的heap,以二進制輸出成文本。打印出某個Java進程內存內的所有‘對象的情況,結合能夠為隊列分配的內存大小,計算出隊列容納任務的最大數(shù)量,以避免內存溢出。

      參考文獻

      [1] 逯昌浩.淺析多核處理器條件下的Java編程[J].中國科技信息,2009(12):128,130.

      [2] 張復興,曾新洲.擴展線程池模型及性能分析[J].計算技術與自動化,2007(4):110-112.

      [3] (美)Bruce Eckel.Java編程思想[M].陳昊鵬,譯.北京:機械工業(yè)出版社,2007.

      猜你喜歡
      入隊線程調用
      今天我入隊——入隊儀式
      少先隊活動(2022年5期)2022-06-06 03:45:02
      1+1我們這樣學隊章:我們的入隊誓詞
      少先隊活動(2020年7期)2020-08-14 01:17:50
      核電項目物項調用管理的應用研究
      LabWindows/CVI下基于ActiveX技術的Excel調用
      測控技術(2018年5期)2018-12-09 09:04:46
      今天我入隊了
      入隊風波
      淺談linux多線程協(xié)作
      基于系統(tǒng)調用的惡意軟件檢測技術研究
      利用RFC技術實現(xiàn)SAP系統(tǒng)接口通信
      Linux線程實現(xiàn)技術研究
      新平| 普兰店市| 太原市| 新巴尔虎左旗| 襄汾县| 漳平市| 忻城县| 庐江县| 岳西县| 横峰县| 蓬安县| 安新县| 临潭县| 宜川县| 文昌市| 芦溪县| 青铜峡市| 桃园市| 将乐县| 陆良县| 孙吴县| 云梦县| 青浦区| 曲靖市| 英山县| 施秉县| 方山县| 泰兴市| 正宁县| 介休市| 洛隆县| 九龙城区| 高碑店市| 城口县| 庆阳市| 台江县| 吉林市| 涿州市| 汤阴县| 芮城县| 光山县|