短時間內重複交易防止 redis lock


#1

這篇寫在這邊,單純小小的東西而已,我們家有出現過的,使用者在短時間內送出兩個封包,所以因為系統 lag 兩個封包一起到,導致使用者做了兩次的交易的狀況,這邊可以用 RDBMS 或 Redis 來解

RDBMS 很簡單但是不是最佳解,transaction 內寫入 timestamp,且在 transaction 內檢查即可,但會牽涉到 lock 很麻煩,而比較漂亮的解法其實用 Redis 實作掉就好了,以下是 code

class User < ActiveRecord::Base
  ACTION_LOCK_KIND = [
    '金額轉出',
    '購物車結帳'
  ]
  def action_lock(kind , addon_timer = 10)
    key = "ACTION_LOCK_#{self.id}_#{kind.to_i}"
    return (
      $redis.multi do
        $redis.setnx(key , Time.now.to_i)
        $redis.expire(key , addon_timer)
      end
    )[0]
  end
end

使用

class ItemsController < ApplicationController
  def pay_money
    unless current_user.action_lock(1)
      flash[:warning] = "短時間內無法結帳兩次,請重新檢查沒有多的結帳後,再嘗試一次"
      redirect_to order_list_path
      return
    end
    #你其他的一般code
  end
end

okay 以上,流程解釋:

Redis 內有幾個東西可以玩,一個指令叫做 SETNX,如果一個 key 存在就設定它,如果 key 存在就忽略,會回傳 true / false,後面的 EXPIRE 則是設過期時間,一定是 true(因為前面有設 key 了)而中間用 MULTI 和 EXEC 來做 transaction,代表指令不可分割,而如果使用了 MULTI 最後回傳值是 array,array 的內容就是每個 Redis 指令的結果,所以裡面有兩個指令,會回兩個值,而我們只需要 SETNX 的值的意思

so~ 上面就是簡單的短效交易防止,以上


#2

之前有參考大大的這篇文章去做鎖,但最近剛好重看一下 文件 說重複使用 expire 會更新該 key 的到期時間,那這種情境下使用這種作法(因為 expire 一定會被促發) ,如果持續不斷來戳一個還沒到期的鎖,好像會一直無限延長下去?是這樣嗎?


#3

… 原來我的 action_lock 是兩年前寫的|||| 還真的歲月不饒人 X"DD

我後來寫了另外一版,你參考看看?(放在 controller 內,可能最上層,類似 application_controller)

def action_lock(token, sec_expire = 600 ,user = nil)
  return false if !sec_expire || sec_expire < 1
  return Rails.cache.increment("PERFIX_#{token}", 1, expires_in: sec_expire) == 1
end

這邊的缺點是要共用 memcached 就是了(要上 dalli gem),而非用 redis 來做這件事情, redis 也會乾淨點 (當然你也可以多開個 Redis 的 DB 序號之類的,就不會污染再一起了,redis 內也有 INCRBY 可用)

這個做法會漂亮些,我建議用新的這個做法就是(畢竟可以把一票垃圾都歸給 memcached …)

再來就是你對"鎖"的概念了 … 重點會是上面的 token 這個欄位怎麼組合?一般來說我大概都會用類似

"#{user_id}_#{kind}_#{serial}"

這邊說明一下,同一個使用者能不能做相同的操作,又可分為:

  1. 同一使用者操作(user_id)
  2. 同一使用者同一行為操作(user_id , kind)
  3. 同一使用者允許同一行為,但不允許同頁面連續操作(user_id , kind , serial)

附加說明這三個要件的不同(以下說明用,並非真實狀況,有的必需還要配額外的 lock 才行),(1)類似全站抽獎,同一個使用者只能抽一次,所以不管切瀏覽器或是用手機都只能抽一次,(2)類似使用者投票,有三個對象可以投,但每個對象只能投一次,(3)類似防止連點,有個使用者想要新增單,每個視窗按下去都必須成功,但不應該連續按兩次變成兩張單

以上,維度只是看你組合罷了,通常三維是我的使用極限,至於你說的『鎖定時間延續』問題也很正常,繼續上面的狀況:有一個使用者用了滑鼠連點裝置,在(3)的狀況下,連點 200000 次,應該成功幾張單?

okay … 我想你應該懂該如何組合了,如果你怕延續擋到你,那麼你應該配 random_serial 出去才是對唄?(每次重新整理都配新的 random_serial),當然凡事都有例外啦,類似你可以在 redis / memcached 內塞 timestamp 然後手工過期砍就好了,甚至單純的用 DB lock,因為這應該就不是 action_lock 的本意了就是,以上

& 發覺我當初把新版的寫在我私人的 blog 了 … 通常太困難或奇怪的東西會寫在那上面就是 X"DD


#4

我這邊的做法是 client 先跟 server 要一個 transaction id

然後 server 會隨機 generate 一個 id 出來,去 redis 問問看這個 id 是不是有人用了,
辦法是用 hsetnx 看看這個 id 為名的 hash 中有沒有 used_flag
如果沒有就會在這個時候設為 1;如果有的話 hsetnx 什麼事也不會做也不會影響 expire,然後再去 gen 另一個 id 出來用。

拿到可用的 id 之後,把整個 hash 設 expire,然後發回去給 client

client 用這個 transaction id 跟 server 發 request,這時候才傳實際的 payload,
server 這時候會檢查一些事情:

  1. hexists 檢查名稱為 id 的 hash 中 used_flag 是不是存在;不存在就表示這個 transaction id 不存在或過期,拋錯誤
  2. 檢查同樣的 hash 中 performing 存不存在(用 hsetnx,原理同上);存在就表示這個 transaction 處理中,拋錯誤
  3. 檢查 hash 中 data 存不存在;存在就表示已經有 client 曾經發過一樣的 transaction 了,檢查 data 裡面記錄的 request 跟這次有沒有一樣,一樣的話就從 data 拿出那一次記錄的 response 直接回給 client;不一樣的話就表示 client 智障(ry),拋 conflict 錯誤
  4. 先 persist 整個 hash,避免接下來的過程太久讓這個 transaction id 失效
  5. 實際 process transaction,並把 request 跟 response 都 hset 進去那個 hash,並且重新設 expire 讓 client 在一段時間內網路不順重發一樣的 request 的時候可以直接從上面的流程中取得 cached response

大致上就是這樣, code 的話大概是這樣:

class TransactionBase
  class << self
    def generate(*args)
      tr = self.new args

      id = tr.generate_id
      until $redis.hsetnx tr.redis_key(id), 'used_flag', 1
        id = tr.generate_id
      end
      $redis.expire tr.redis_key(id), tr.keep_during
      id
    end

    def perform(id, *args, &block)
      request_data = args.extract_options!

      tr = self.new args
      key = tr.redis_key(id)
      raise Errors::NotFound.new 'Transaction is not exist.' unless $redis.hexists key, 'used_flag'
      raise Errors::Locked.new 'Another request is processing this transaction.' unless $redis.hsetnx key, 'performing', 1

      original_request, response_data = JSON.parse($redis.hget(key, 'data') || '[]', symbolize_names: true)
      request_data.deep_symbolize_keys!

      if original_request # already performed
        raise Errors::Conflict.new 'Transaction has performed with another payload.' if request_data != original_request
      else
        $redis.persist key
        begin
          response_data = yield
          $redis.hset key, 'data', [request_data, response_data].to_json
        ensure
          $redis.hsetnx key, 'used_flag', 1
          $redis.expire key, tr.keep_during
        end
      end

      response_data
    ensure
      $redis.hdel key, 'performing' # Release the lock
    end
  end

  def initialize(*args)
    @args = args
  end

  def transaction_namespace
    %w( transaction )
  end

  def generate_id
    Time.now.strftime('%H%M%S%6N') + Random.rand(256).to_s(16)
  end

  def redis_key(transaction_id)
    ns = transaction_namespace.join('.')
    "#{ns}:#{transaction_id}"
  end

  def keep_during
    10.minutes
  end
end
class PaymentTransaction < TransactionBase
  def transaction_namespace
    super + %W( payment )
  end
end
class Payment < ApplicationController
  def create # POST
    render json: { transaction_id: PaymentTransaction.generate }
  end

  def update # PUT
    render json: PaymentTransaction.perform(transaction_id, params_for_payment) do
      # do someting for the transaction and return response
      {
        status: 'success',
        # ...
      }
    end
  end

  private

  def transaction_id
    params[:transaction_id]
  end

  def params_for_payment
    params.permit(:from, :to, :amount).to_h
  end
end

差別在我不是單純做 lock,而是在 client 重發一樣的 request 到一樣的 transaction 的時候,會把 cached response 返回去,很多國外的支付 API 都會這樣做……


#5

你的好囉唆喔 … 交易的我有 SSDB 做類似的事情哩,我叫 serial builder,動作和你差不多但 code 少很多|||,主因,SSDB 有硬碟維持,我無法使用 redis 來完成,除非上 AOF 或 redislab …


#6

XDDDDDD

我直接做在一起了 我這邊是 ElasticCache 啦 所以還好