当前位置:首页 > 开发教程 > 数据库 >

从MySQL和PG数据库中提取数据,并将其整合到一个单一的目标存储

时间:2016-02-18 15:59 来源:互联网 作者:源码搜藏 收藏

Square Dbsync 一种提取和加载系统分流数据库之间的数据。 它采用基于时间戳的复制是快速和易于保持运行,但有一些注意事项。 最值得注意的是,它不处理以及删除(详情请参见下面的文档)。 这在广场,因为我们需要的部分(仅限于选择列),无论从 MySQL 和P

Square Dbsync

一种提取和加载系统分流数据库之间的数据。

它采用基于时间戳的复制是快速和易于保持运行,但有一些注意事项。最值得注意的是,它不处理以及删除(详情请参见下面的文档)。

这在广场,因为我们需要的部分(仅限于选择列),无论从MySQL和PostgreSQL数据库,连续复制到一个目标数据库与沿途的一些基本的ETL逻辑是对我们有用。现有的解决方案中没有能够充分地做到这一点。

在某些时候,你需要硬着头皮实现真正的ETL系统,但是SQ-dbsync可你渡过难关,直到你到达那里。

dbsync是MySQL utf8mb4干净的:它会正确处理四字节的UTF8字符像表情图案。在JRuby中,你需要配置到服务器的字符集utf8mb4该规格包括这个试验; 如果您在JRuby下具有不同的服务器字符集运行它们的MySQL他们就会失败。

用法


gem install sq-dbsync
require 'sq/dbsync'

include Sq::Dbsync

# Config will typically differ per environment.
config = {
  sources: {
    db_a: {
      database: 'db_a_production',
      user:     'sqdbsync-ro',
      password: 'password',
      host:     'db-a-host',
      brand:    'mysql',
      port:     3306,
    },
    db_b: {
      database: 'db_b_production',
      user:     'sqdbsync-ro',
      password: 'password',
      host:     'db-b-host',
      brand:    'postgresl',
      port:     5432,
    }
  },
  target: {
    database: 'replica',
    user:     'sqdbsync',
    password: 'password',

    # Only localhost supported, since `LOAD DATA INFILE` is used which
    # requires a shared temp directory.
    host:     'localhost',
    brand:    'mysql',
    port:     3306,
  },

  # Optional configuration
  logger: Loggers::Stream.new,     # A graphite logger is provided, see source.
  clock: ->{ Time.now.utc },       # In test env it can be useful to fix this.
  error_handler: ->(e) { puts(e) } # Notify your exception system
}

# Write plans that specify how data is replicated.
DB_A_PLAN = [{
  table_name: :users,
  columns: [
    # You must replicate the primary key.
    :id,

    # You must replicate a timestamp column, and it should be indexed on the
    # target system.
    :updated_at,

    # Then whatever other columns you require.
    :name,
    :account_type,
    :created_at,

  ],
  indexes: {
    # Indexing it on the source system is optional
    index_users_on_updated_at: {:columns=>[:updated_at], :unique=>false},
  },

  # Basic schema transformations are supported.
  db_types: {
    :account_type => [:enum, %w(
      bronze
      silver
      gold
    )]
  }
},{
  table_name: :account_types,
  source_table_name: :user_account_types,
  columns: :all
}]

plans = [
  [StaticTablePlan.new(DB_A_PLAN), :db_a],
  [AllTablesPlan.new, :db_b]
]

manager = Manager.new(config, plans)

# Run a batch load nightly
manager.batch(ALL_TABLES)

# Run an incremental load continuously
manager.increment

# You can load a subset of tables if necessary
manager.batch([:users])

文档

计划选项

  • batch_load是否批次负载该表中的默认批次负载。如果该表明确要求,将无论此设置加载。(默认:true)
  • 字符集字符集,以创建表时使用。通过直接传递到 续集:: MySQL的数据库::#连接只有MySQL的,忽视了Postgres的。(默认:“UTF8”)
  • 要么列的阵列复制,或:所有指示所有列应该被复制。(需要)
  • 一致性桌子上定期增量加载过程中,通过比较最近的源和目标表的计数执行基本的一致性检查。请确保您有两个表时间戳指数!开发该项目时,这是非常有用的,但老实说,现在可能是没有多大用处的---我不记得我最后一次看到错误来自于此。(默认值:false)
  • db_types哈希,使您可以从源修改目标模式。见上面用法部分的示例。(默认:{} 
  • 索引散列定义目标表所需的索引。索引是 不是从源表复制。见例如上面的章节。(默认:{} 
  • refresh_recent一些表太大而批量负载经常,但修改已知是最近。此设置将导致数据的最后两天被丢弃的重建为夜间批处理负载的一部分。(默认值:false)
  • source_table_name允许源和目标表进行不同的命名。(默认:table_name的配置选项)
  • timestamp_table_name一个黑客要解决Postgres的查询规划无法正确使用索引MAX()上使用视图UNION 在幕后。如果这个描述你的源视图和基础表之一是保证包含您可以设置此值,最新的记录,它将被用于所有时间戳相关查询。如果没有,你必须提供一个支持自定义视图MAX有一个健全的查询计划查询。(默认值:无)
  • table_name的要复制的表的名称。如果source_table_name 指定时,这个选项定义了在仅目标数据库中的表的名称。
  • primary_key通常主键可以从源模式推断,但如果你是从视图复制,您需要使用此选项显式地指定它。应符号的阵列。(默认值:无,从源代码模式将自动检测)
  • 时间戳来当作一个时间戳列。必须是会员 :列选项。(默认值:选择的updated_atcreated_at,顺序)

处理删除

增量负荷没有检测删除的记录方式。夜间批处理负荷将重新加载所有的表,所以会有最多为期一天的周转上删除。有些表会过大,批量加载每天晚上然而,因此这不是在这种情况下很好的解决方案。

如果你有一个包含足够的数据,为您重建其他表中删除了“审计”表,那么你可以提供一个自定义子类增量加载器将能够运行这个逻辑。


	
class IncrementalLoadWithDeletes < Sq::Dbsync::IncrementalLoadAction
  def process_deletes
    if plan.table_name == :audit_logs
      ExampleRecordDestroyer.run(db, registry, :audit_logs, :other_table)
    end
  end
end

CONFIG = {
  # ...
  incremental_action: IncrementalLoadWithDeletes,
}

lib/sq/dbsync/example_record_destroyer的一个样本实现。

数据库设置

如果你的目标数据库是MySQL,我们建议您确保它的下运行READ COMMITTED隔离级别。这使得它更难的分析师锁定表和块复制。(之类的语句CREATE TABLE AS SELECT FROM ...往往是罪魁祸首。)

实例下载地址https://github.com/square/sq-dbsync/archive/master.zip


数据库阅读排行

最新文章