請教 ConnectionPool 的一段 Thread code 意思


#1

Hi,各位好,小弟最近在學習跟探索 thread, process 相關的知識,然後剛好看到 connection_pool 的原始碼,其中測試 (test/test_connection_pool.rb:183) 裡面有段
Thread.pass while thread.status == 'run'
雖然有看過一些 Thread 教學文章跟 API 文件,但仍不知道這行 code 的用意是。
因為看起來它的邏輯是在『不斷地讓出當前 thread 的執行順位,如果 thread 還在執行狀態的話』。
但還是想當地抽象跟不知道其用意何在,像是:

  1. 為何要不斷地 Thread.pass ?! (如果 thread 是處於執行階段,那當前的 thread 應該就是本身吧,那 Thread.pass 不就是把自己的執行狀態讓出去,除非是多核心)

  2. while thread.status == 'run' 看起來是當 thread 不再是運行狀態就會跳離出這個條件式,但僅僅不再是 run 的狀態應該也不能保證該 thread 已經被處理完了吧?!因為如果那個 thread 有在等待外部 IO 或是一些情況進入 sleep,然後就跳離出去了,但 thread 依然還沒被執行完。

因為上述兩點也想不透所以我也無法理解這句真正的目的為何,希望還請高人指點迷津,感謝各位。

最後附上完整的 code:

    pool = ConnectionPool.new(timeout: 0, size: 1) { NetworkConnection.new }

    t = Thread.new do
      pool.with do |net|
        net.do_something
        sleep 0.01
      end
    end

    Thread.pass while t.status == 'run'

    assert_raises Timeout::Error do
      pool.with { |net| net.do_something }
    end


#2

依照 Thread#status 的文件1中提到,status 只有以下 5 種結果:

  • "sleep"

    Returned if this thread is sleeping or waiting on I/O

  • "run"

    When this thread is executing

  • "aborting"

    If this thread is aborting

  • false

    When this thread is terminated normally

  • nil

    If terminated with an exception.

也就是說只有 "run" 是運作中,其他都是已結束。


#3

這裡 Thread.pass while t.status == 'run' 想要表達的是:

如果這個 Thread t 還在執行中,就把自己的 thread pass 出去 context switch;
意思是說:

如果 Thread t 還在運作,我就停下來等(Thread.pass)他結束。

為了不因為使用無限迴圈(true while t.statue == 'run')導致的 CPU 100% 問題,這裡改用
Thread.pass 把 CPU 資源交出去,等到下一次 context switch 回來這個 thread 的時候再繼續檢查那個 t 是不是已結束。


#4

Thread 有一個很大的點是:他是平行處理的,所以同時間可能會有很多(也有可能是一個) Thread 在執行。

而作業系統也不會希望一個 Thread 獨佔太久,所以作業系統會嘗試不斷的作 Context switch 把資源收回來,再分配給其他 Thread 輪流使用。(相關說明可以看 wikipedia 上面的解釋 https://en.wikipedia.org/wiki/Context_switch

那麼範例程式碼中作的事情大致上區分的話可以看成是:

# 建立一個會 Timeout 的機關
    pool = ConnectionPool.new(timeout: 0, size: 1) { NetworkConnection.new }

# 初始化一個新的 Thread t
    t = Thread.new do
      pool.with do |net|
        net.do_something
# 強制休息 0.01,這裡應該要觸發 Timeout 的機關
        sleep 0.01
      end
    end

# 等待 t 結束
    Thread.pass while t.status == 'run'

# 這裡應該要抓到 Timeout
    assert_raises Timeout::Error do
      pool.with { |net| net.do_something }
    end

這裡 main Thread 跟 t Thread 有可能同時,也有可能不是同時執行的,
但是 t Thread 最後的狀態會被保存在 t.status 中,只要 OS 把資源交給 main Thread 執行的時候就檢查一下,
如果 t Thread 已經結束,就可以來檢查看看他剛剛有沒有乖乖觸發 Timeout;
如果 t Thread 還沒結束,就繼續等 OS 把資源交給 t 執行,並且讓出 main 自己的資源,直到下次 OS 又輪給 main。
(如果 t 還沒結束,這時候 main 來檢查有沒有觸發 Timeout 根本沒有意義)


#5

感謝大大回覆
不過 sleep 文件給的回覆是
Returned if this thread is sleeping or waiting on I/O
所以 sleep 應該無法判斷他已結束吧,可能還會被喚醒變回 run 。


#6

我覺得是因為一休息就會 timeout 的關係吧,就算是 sleep 狀態也應該要觸發 Timeout Exception,就可以直接下去抓看看有沒有觸發了


#7

所以我都用 rufus-scheduler 然後 main 都在睡,無聊丟些狀況出來,這樣就算 thread 掛掉 main 也不會死,而 rufus-scheduler 可以把下個排程用的 thread 再度喚醒繼續做事 …


#8

Hi, 今天又花了一點時間和思考大大的回覆,又有幾個新的問題想問:

  1. 如果 Thread.pass while t.status == 'run' 是希望等待 t 結束,那為何不使用 t.join ,這樣 main thread 不是一定會等它跑完 ?

  2. 後來嘗試自己試了一些例子:

thread = Thread.new do
  10.times do
    puts 'lalalala'
    puts 'landland'
    puts 'haha'
  end
  sleep 20
  puts 'wake up!'
end

Thread.pass while thread.status == 'run'

puts 'This is main thread call.'
puts "thread's status is #{thread.status}."

結果會是:

lalalala
landland
haha
This is main thread call.
lalalala
landland
haha
lalalala
landland
haha
lalalalathread's status is sleep.

看起來 thread 也沒跑完就接著下去了(甚至還沒進入 sleep 20),所以我覺得比較可能關係,是否它是等到 thread 本身被 scheduler 離開(thread 進入 sleep)去調用 main 時就跳出 while thread.status == 'run',亦即該 thread 第一次被 scheduler 切換出去,而不是靠 Thread.pass 本身把自身行使權讓出去?!

  1. 其實對這句 t Thread 最後的狀態會被保存在 t.status 中,只要 OS 把資源交給 main Thread 執行的時候就檢查一下 有點感到抽象,這樣是否可以說在除了使用 Thread.pass 以外的可能,當 main 執行時去觀察 t.status 時都不會是 run ?! 因為能被切換到 main 去執行時 thread t 應該不是執行完畢就是在睡覺、等待吧?? thread t 剛被建立的時候 status 我不確定是不是 run @@
thread = Thread.new do
  10.times do
    puts 'Hello, world.'
  end
end

puts "thread's status is #{thread.status}" #====>  thread's status is run
  1. 題外問題,在 thread 裡面我們可以使用 mutex.synchronize 來確保關鍵地方的原子操作以防無預期地被 scheduler 調用走,那麼 multiple processes 是否也有類似的機制呢? 還是說其實 processes 彼此不共享任何資源,大家的資料都獨立的,所以基本上沒有原子操作的需要 ?

以上,感謝 Orz


#9

最近在寫 … 進階 thread 應用,剛好想找主題貼上來,看看唄?

require 'thread'

module TT
  def self.tester
    @@queue1 = Queue.new
    @@queue2 = Queue.new
    @@package = []

    #宇宙無敵快的thread
    @@fast_thread = Thread.new do
      begin
        loop do
          @@queue1.push(rand) if Time.now.to_i / 6 % 2 == 0 #模擬需求產生中斷頻率
          sleep(0.00333333) #可以嘗試把這行拿掉 ...
        end
      rescue
        puts $! , $@
      end
    end

    #末端整理器
    @@runner_thread = Thread.new do
      begin
        while data = @@queue2.pop(false)
          puts "run : #{data.length} : #{data[0]} : #{data[-1]}"
        end
      rescue
        puts $! , $@
      end
    end

    #收斂打包器
    @@package_thread = Thread.new do
      begin
        loop do
          t = Time.now.to_f
          package = []
          begin #先用最快的方式 merge
            while data = @@queue1.pop(true)
              package << data
              break if Time.now.to_f - t > 1
            end
          rescue ThreadError => e
            raise e unless e.message == 'queue empty'
          end
        
          #判別是否等待,等待時期一樣merge
          while @@queue2.length >= 1 || @@runner_thread.status == 'run'
            begin #先用最快的方式 merge
              while data = @@queue1.pop(true)
                package << data
                break if Time.now.to_f - t > 1
              end
            rescue ThreadError => e
              raise e unless e.message == 'queue empty'
            end
          end
        
          if package.empty?
            print '-'
          else
            @@queue2 << package
          end
          sleep(0.333333)
        end
      rescue
        puts $! , $@
      end
    end

    sleep(65535) #main sleep forever
  end
end
TT.tester

上面的 code 在示範宇宙無敵快的 thread 與一個可能宇宙超級慢的 thread 對接時,中間插一個 buffer / merge / proxy 的 thread 來做整理,三個角色缺一不可 … 而這邊用 Ruby 內建的 Queue (FIFO) 應該最方便才是,也範例了 Queue 的兩種型態 (block / non-block)

Thread 系列最重要的記得就是 Queue & Mutex,其餘一般變數其實都可能發生 race condition … 類似你把一個 int / hash … 弄成 global var 後在不同的 thread 下做處理,應該都會遇到 race condition 才是,不過剩下的就是經驗唄,類似噴到死後 debug 才會知道該怎麼改之類的,單純你開的東西太小了還沒遇到罷了,這很類似用 RDBMS 做簡易的 Blog 系統,根本也用不到 lock,但嚴謹的交易處理時,哪天噴了,就會知道 lock 的重要性唄 … |||

最後 … multi process 是不用做 race condition 啦,不過那是在 memory 的狀況下,如果你有 file / connection 之類的還是會發生唄,而不做 race condition 的處理的原因也是因為你根本拿不到也影響不到 … 所以才不用處理哩 … 而 Ruby 下開 multi process 有點麻煩(收斂部分),不如改開 golang 會好點就是


#10

他沒有要等 t 完全結束,他只是想要讓 t 去執行 sleep 然後進入睡眠,這樣 t 一定要觸發 Timeout

main thread 的任務就只是要讓 t timeout 然後回來檢查他是不是有真的拋出 timeout exception
如果用 Thread.joint 真的跑完的話需要一些時間,但是 main 其實只要等他開始 sleep 就可以了,
因為 timeout 的閥值是 0,只要一 sleep 就一定要 timeout