• 用mysql存储肯定是不靠谱的,应该用Cassandra,DynamoDB等适合存储海量信息的分布式数据库,Cassandra写性能非常好,DynamoDB易扩展支持memory cache,对于热数据需要特别处理。

  • bugsnag, 可以设置同一个错误邮件的频率

  • 你的需求不用hack也能实现。 需求无非是下面几点

    1. 导入需要高性能
    2. 导出需要高性能
    3. 需要有跟踪报错功能

    我的做法是把任务分拆,利用sidekiq多个worker同时执行任务,将结果合并即可。

    实现1导出可以利用find_in_batches, find_each 实现2导入可以利用CSV.foreach 实现3跟踪报错可以用一个model记录Job执行过程中产生的数据和日志即可

    # QueryThread用来把一个大量数据的查询拆分成多个小任务
    # a = Concurrent::Array.new
    # QueryThread.split(Subscription.where("status = ?", "canceled").where("id < 1000"), 5) do |subquery, idx, min_id, max_id|
    #   subquery.find_each do |sub|
    #     a << [idx, sub.id, min_id, max_id]
    #   end
    # end
    #
    class QueryThread
      def initialize(query, min_id, max_id, index, &block)
        @thread = Thread.new do
          subquery = query.where("#{query.table_name}.id >= ? and #{query.table_name}.id <= ?", min_id, max_id)
          block.call(subquery, index, min_id, max_id)
        end
      end
    
      def join
        @thread.join
      end
    
      class << self
    
        def split(query, thread_count, &block)
          arguments_groups = split_to_array(query, thread_count)
    
          query_threads = []
          arguments_groups.each do |argument|
            query_threads << QueryThread.new(query, argument[:min_id], argument[:max_id], argument[:index], &block)
          end
          query_threads.each(&:join)
        end
    
       #根据查询语句的最大id和最小id分组
        def split_to_array(query, worker_number)
          total_min_id = query.pluck("min(#{query.table_name}.id)").first.to_i
          total_max_id = query.pluck("max(#{query.table_name}.id)").first.to_i
          split_range(total_min_id, total_max_id, worker_number)
        end
    
        def split_range(total_min_id, total_max_id, worker_number)
          if total_max_id == 0
            return []
          elsif (total_max_id - total_min_id + 1) < worker_number
            # needn't so much worker
            return [{min_id: total_min_id, max_id: total_max_id, index: 0}]
          end
    
          range_in_thread = (total_max_id - total_min_id + 1) / worker_number
          result = []
          (worker_number - 1).times.each do |i|
            min_id = total_min_id + i * range_in_thread
            max_id = total_min_id + (i + 1) * range_in_thread - 1
            result << {min_id: min_id, max_id: max_id, index: i}
          end
    
          # last thread
          min_id = total_min_id + (worker_number - 1) * range_in_thread
          max_id = total_max_id
          result << {min_id: min_id, max_id: max_id, index: worker_number - 1}
          result
        end
    
      end
    end
    

    设计一个BatchJob用来做数据导出

    class BatchJob < ApplicationJob
      queue_as :default
    
      class_attribute :query_block, :worker_number, :job_block
      def self.set_query(&block)
        self.query_block = block
      end
    
      def self.set_worker_number(number = 10)
        self.worker_number = number
      end
    
      def self.set_job(&block)
        self.job_block = block
      end
    
      def self.perform_batch_later
        query = query_block.call
        argu_group = QueryThread.split_to_array(query, worker_number || 10)
        argu_group.each do |argus|
          self.perform_later(argus)
        end
      end
    
      def perform(options = {})
        min_id = options.fetch(:min_id)
        max_id = options.fetch(:max_id)
    
        q = self.class.query_block.call
        subquery = q.where("#{q.table_name}.id >= ? and #{q.table_name}.id <= ?", min_id, max_id)
        self.class.job_block.call(subquery, options)
      end
    
    end
    

    如下用法,写个job继承BatchJob

    • set_query 里写查询语句,
    • set_worker_number 设定分几次执行(可以同时执行),
    • set_job里写具体查询到的结果进行逻辑
    • 执行的时候ExportDataJob.perform_batch_later即可,以下例子会把查询拆成40个job,可同时导出,最后再根据Attachment的数据合并即可。
    class ExportDataJob < BatchJob
      set_query {
        Customer.where("updated_at > ?", Time.parse("Tue Apr 3 14:08:33 2018 -0500"))
      }
    
      set_worker_number 40
      set_job do |query, thread_hash|
        Attachment.upload_csv!(Rails.root.join("tmp/fix_grandfater_#{thread_hash[:index]}.csv"), "wb") do |csv|
          query.includes(:user).find_each(batch_size: 100) do |customer|
            csv << [customer.id, customer.name, ...] #自己写逻辑
          end
        end
      end
    
    end
    

    根据以上原理,可以写出类似的方法做导入

    class CsvJob < ApplicationJob
      queue_as :default
    
      class_attribute :worker_number, :job_block
    
      def self.set_worker_number(number = 10)
        self.worker_number = number
      end
    
      def self.set_job(&block)
        self.job_block = block
      end
    
      # CsvJob.perform_batch_later(123, csv: {headers: true}, job: {store: "123"})
      def self.perform_batch_later(attachment_id, options = {})
        options[:csv] ||= {}
        csv_data = Attachment.find(attachment_id).file.read
        total_lines = CSV.parse(csv_data, options[:csv]).count
    
        worker_group = QueryThread.split_range(0, total_lines - 1, worker_number || 10)
        worker_group.each do |worker_options|
          self.perform_later(attachment_id, options, worker_options)
        end
      end
    
      def perform(attachment_id, options = {}, worker_options = {})
        temp_filename = Rails.root.join("tmp/temp_csv_#{attachment_id}_#{options[:index]}.csv")
        File.open(temp_filename, "wb") do |f|
          f.write Attachment.find(attachment_id).file.read
        end
    
        csv = CSV.foreach(temp_filename, options[:csv])
    
        self.class.job_block.call csv, worker_options, options[:job]
      end
    end
    

    用法如下,

    • set_worker_number 设置分几次完成
    • set_job 设置逻辑,这里手动和worker_options[:min_id] [:max_id]做对比 代码丑陋了点。

    调用时 参数为含有csv文件信息的Attachment的id,以及其他自定义参数 #ImportCustomerJob.perform_batch_later(attachment_id, csv: {headers: true}, job: {store: "stage"})

    
    class ImportCustomerJob < CsvJob
    
      set_worker_number 20
    
      set_job do |csv, worker_options, job_options|
        job_options ||= {}
    
        Attachment.log("import_#{job_options[:store]}_#{worker_options[:index]}") do |logger|
          logger.info "start.."
          index = 0
          csv.each do |row|
    
            if index < worker_options[:min_id]
              index += 1
              next
            end
            if index > worker_options[:max_id]
              break
            end
    
            Customer.create_or_update_by!(store: job_options[:store], remote_id: row["remote_id"]) do |c|
              c.attributes = row.to_h
            end
    
            index += 1
          end
          logger.info "finish.."
        end
      end
    end
    

    最后附上Attachment,代码随便写了写。起个持久化的作用

    class Attachment < ApplicationRecord
      mount_uploader :file, S3FileUploader
    
    
      def self.upload_file!(filename, name = nil)
        a = Attachment.new
        file = File.open(filename, "r")
        a.file = file
        a.name = name
        a.save!
        a
      ensure
        file.close if file
      end
    
      def self.upload_csv!(filename, csv_options, &block)
        CSV.open(filename, csv_options) do |csv|
          block.call(csv)
        end
      ensure
        upload_file!(filename)
      end
    
      # todo add log and upload to s3
      def self.log(name, &block)
        log_file = open(Rails.root.join("tmp/#{name}.log"), "w")
        log_file.sync = true
        logger = Logger.new(log_file)
        logger.formatter = proc{|severity, time, program, msg|  "#{severity}: #{msg}\n" }
    
        begin
          block.call(logger)
        rescue Exception => e
          logger.error e.message
          logger.error e.backtrace.join("\n")
          raise e
        ensure
          log_file.close
          Attachment.upload_file!(Rails.root.join("tmp/#{name}.log"), name)
        end
      end
    end
    

    这样不修改Sidekiq就可以完成导入,导出。也不用担心Thread的问题。全程有记录,报错的话Sidekiq会重启任务(一般是你自己的逻辑有问题),符合你的要求。

  • 查询条件from = a下不重复的数据 T.where("from=?", "a").group("to").having("count(*) = 1").pluck("to")

    若 from = a ,to = xx 或者 from = xx , to = a 只算一条的话 T.where("not exist (select * from T t2 where t2.from = t.to and t2.to = t.from) ").group("from, to").having("count(*) = 1").pluck "from, to"

  • @huacnlee 能不能给帖子加一个“踩”的功能,有些帖子太辣眼睛。

  • 谨防比特币和区块链骗局 at 2018年01月25日

    👏

  • 多谢!😀

  • 显示器上的灯哪里买的?看起来很不错的样子。

  • <tbody>在外面包一层