<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="zh-Hans-CN">
	<id>https://wiki.riguz.com/index.php?action=history&amp;feed=atom&amp;title=Blog%3A%E4%B8%80%E4%B8%AA%E7%AE%80%E5%8D%95%E7%9A%84ETL%E7%A8%8B%E5%BA%8F</id>
	<title>Blog:一个简单的ETL程序 - 版本历史</title>
	<link rel="self" type="application/atom+xml" href="https://wiki.riguz.com/index.php?action=history&amp;feed=atom&amp;title=Blog%3A%E4%B8%80%E4%B8%AA%E7%AE%80%E5%8D%95%E7%9A%84ETL%E7%A8%8B%E5%BA%8F"/>
	<link rel="alternate" type="text/html" href="https://wiki.riguz.com/index.php?title=Blog:%E4%B8%80%E4%B8%AA%E7%AE%80%E5%8D%95%E7%9A%84ETL%E7%A8%8B%E5%BA%8F&amp;action=history"/>
	<updated>2026-06-02T19:45:54Z</updated>
	<subtitle>本wiki上该页面的版本历史</subtitle>
	<generator>MediaWiki 1.42.3</generator>
	<entry>
		<id>https://wiki.riguz.com/index.php?title=Blog:%E4%B8%80%E4%B8%AA%E7%AE%80%E5%8D%95%E7%9A%84ETL%E7%A8%8B%E5%BA%8F&amp;diff=2587&amp;oldid=prev</id>
		<title>imported&gt;Riguz：​这两天闲着没事准备玩一下社工库，网上有很多以前的社工库（这些都可以下载到，但是实际上已经没有什么太大的价值了，因为暴露时间太久，以及相关的网站都已经做了处理，所以别指望能够找到什么有价值的东西），通过社工库可以了解到的一个实际的数据就是，用户的设置的密码大都是什么样子的。我准备看一下搜云社工库，这个库大概4亿多条数据，主要目的是实践一下大量数据的处理。</title>
		<link rel="alternate" type="text/html" href="https://wiki.riguz.com/index.php?title=Blog:%E4%B8%80%E4%B8%AA%E7%AE%80%E5%8D%95%E7%9A%84ETL%E7%A8%8B%E5%BA%8F&amp;diff=2587&amp;oldid=prev"/>
		<updated>2020-05-04T00:00:00Z</updated>

		<summary type="html">&lt;p&gt;这两天闲着没事准备玩一下社工库，网上有很多以前的社工库（这些都可以下载到，但是实际上已经没有什么太大的价值了，因为暴露时间太久，以及相关的网站都已经做了处理，所以别指望能够找到什么有价值的东西），通过社工库可以了解到的一个实际的数据就是，用户的设置的密码大都是什么样子的。我准备看一下搜云社工库，这个库大概4亿多条数据，主要目的是实践一下大量数据的处理。&lt;/p&gt;
&lt;p&gt;&lt;b&gt;新页面&lt;/b&gt;&lt;/p&gt;&lt;div&gt;这两天闲着没事准备玩一下社工库，网上有很多以前的社工库（这些都可以下载到，但是实际上已经没有什么太大的价值了，因为暴露时间太久，以及相关的网站都已经做了处理，所以别指望能够找到什么有价值的东西），通过社工库可以了解到的一个实际的数据就是，用户的设置的密码大都是什么样子的。我准备看一下搜云社工库，这个库大概4亿多条数据，主要目的是实践一下大量数据的处理。&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
= 还原原始数据库=&lt;br /&gt;
这个库（下载地址请自行搜索）下载完成之后是一个`1.bak`（SQL SERVER的备份文件）文件，接近30G，网上有详细的教程如何导入，总结起来大致如下:&lt;br /&gt;
&lt;br /&gt;
* 安装SQL SERVER 2008 R2（标准版或者更高）&lt;br /&gt;
* 将1.bak复制到SQL SERVER的实例备份目录，否则会无法导入&lt;br /&gt;
* 因为还原后的数据库占用空间在130G，所以要保证数据库的存储磁盘空间足够&lt;br /&gt;
&lt;br /&gt;
还原完成之后即可得到一个sgk的数据表，里面有423771078条数据。&lt;br /&gt;
&lt;br /&gt;
= 导入到MySQl=&lt;br /&gt;
因为最终希望能够在MYSQL中使用数据，所以还得将数据迁移到MySQL中。最初的想法是直接将数据导出到csv，然后再通过MySQL导入csv。但是这个方法尝试了几次之后发现一些问题：&lt;br /&gt;
&lt;br /&gt;
* 数据中可能存在一些非可见字符，导致导出后的csv格式混乱&lt;br /&gt;
* 莫名其妙的问题无法解析&lt;br /&gt;
&lt;br /&gt;
索性就直接放弃了这种方式，而是通过手写ETL来完成迁移。于是设计了一个简陋的ETL程序，还挺有意思的。&lt;br /&gt;
&lt;br /&gt;
== 整体架构==&lt;br /&gt;
&lt;br /&gt;
ETL的主要目的是把数据从一个地方转移迁移到另一个地方，这其中可能进行一些其他的操作比如清洗或者格式转换。整体的思路是这样的：&lt;br /&gt;
&lt;br /&gt;
* 有一个source（源数据）和destination（目标数据源）&lt;br /&gt;
* ETL执行的时候从源数据查询数据，然后保存到目标数据库中，为了提高效率，每次会查询一批数据，保存的时候也会一批一批保存&lt;br /&gt;
* 为进一步提高性能，ETL可以设定多个线程同时处理，为了避免冲突，每个线程不会处理同一条数据，是互斥的&lt;br /&gt;
&lt;br /&gt;
== ETL Engine==&lt;br /&gt;
&lt;br /&gt;
&amp;lt;syntaxhighlight lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
public class Engine {&lt;br /&gt;
    private static final Logger logger = LoggerFactory.getLogger(Engine.class);&lt;br /&gt;
&lt;br /&gt;
    private final int threads;&lt;br /&gt;
    private final int totalCount;&lt;br /&gt;
    private final int batchSize;&lt;br /&gt;
&lt;br /&gt;
    private final HikariDataSource source;&lt;br /&gt;
    private final HikariDataSource destination;&lt;br /&gt;
&lt;br /&gt;
    private final AtomicLong rangeSelector = new AtomicLong(0);&lt;br /&gt;
&lt;br /&gt;
    private final List&amp;lt;Thread&amp;gt; tasks = new ArrayList&amp;lt;&amp;gt;();&lt;br /&gt;
&lt;br /&gt;
    public Engine(int threads, int totalCount, int batchSize) {&lt;br /&gt;
        this.threads = threads;&lt;br /&gt;
        this.totalCount = totalCount;&lt;br /&gt;
        this.batchSize = batchSize;&lt;br /&gt;
        this.source = new HikariDataSource(new HikariConfig(&amp;quot;src/main/resources/source.properties&amp;quot;));&lt;br /&gt;
        this.destination = new HikariDataSource(new HikariConfig(&amp;quot;src/main/resources/destination.properties&amp;quot;));&lt;br /&gt;
    }&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
&lt;br /&gt;
这个ETL引擎担负管理任务的角色，有这些参数：&lt;br /&gt;
&lt;br /&gt;
* threads: 可以设定多少个线程同时运行；&lt;br /&gt;
* totalCount: 设定一个totalCount作为任务结束的条件（当然也可以在查询不到数据的时候结束)&lt;br /&gt;
* batchSize: 每一批的大小&lt;br /&gt;
* rangeSelector: 用来控制每个线程处理的数据，这里将直接用id来区分&lt;br /&gt;
&lt;br /&gt;
同时直接初始化了两个数据库的连接池，这里选用的是Hikari连接池。&lt;br /&gt;
&lt;br /&gt;
&amp;lt;syntaxhighlight lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
public void run() throws InterruptedException {&lt;br /&gt;
    final CountDownLatch countDownLatch = new CountDownLatch(threads);&lt;br /&gt;
    for (int i = 0; i &amp;lt; threads; i++) {&lt;br /&gt;
        Thread thread = new Thread(new Task(source,&lt;br /&gt;
                destination,&lt;br /&gt;
                totalCount,&lt;br /&gt;
                rangeSelector,&lt;br /&gt;
                batchSize,&lt;br /&gt;
                countDownLatch));&lt;br /&gt;
        tasks.add(thread);&lt;br /&gt;
    }&lt;br /&gt;
    tasks.forEach(Thread::start);&lt;br /&gt;
    countDownLatch.countDown();&lt;br /&gt;
    countDownLatch.await();&lt;br /&gt;
}&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
&lt;br /&gt;
核心逻辑就是，启动N个线程，然后一直等待到每个线程都结束，即完成了ETL任务。&lt;br /&gt;
&lt;br /&gt;
== Task==&lt;br /&gt;
&lt;br /&gt;
Task对应到每个线程，每个线程处理的任务是一样的，只是处理的数据记录不同。&lt;br /&gt;
&lt;br /&gt;
&amp;lt;syntaxhighlight lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
public void run() {&lt;br /&gt;
    logger.info(&amp;quot;ETL task {} running...&amp;quot;, Thread.currentThread().getId());&lt;br /&gt;
    try (Connection sourceConn = source.getConnection();&lt;br /&gt;
         Connection destinationConn = destination.getConnection()) {&lt;br /&gt;
        while (true) {&lt;br /&gt;
            final long startId = rangeSelector.getAndAdd(batchSize);&lt;br /&gt;
            final long endId = startId + batchSize;&lt;br /&gt;
            if (startId &amp;gt; totalCount) {&lt;br /&gt;
                logger.info(&amp;quot;Reached end of records:{} , task finished&amp;quot;, startId);&lt;br /&gt;
                break;&lt;br /&gt;
            }&lt;br /&gt;
            job.doTransfer(sourceConn, destinationConn, startId, endId);&lt;br /&gt;
        }&lt;br /&gt;
    } catch (SQLException ex) {&lt;br /&gt;
        logger.error(&amp;quot;Failed to get data source, ex&amp;quot;);&lt;br /&gt;
    } finally {&lt;br /&gt;
        logger.info(&amp;quot;ETL task {} finished.&amp;quot;, Thread.currentThread().getId());&lt;br /&gt;
        countDownLatch.countDown();&lt;br /&gt;
    }&lt;br /&gt;
&lt;br /&gt;
}&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
&lt;br /&gt;
处理过程也很简单，拿到一个connection之后，根据rangeSelector得到要获取的id范围，然后委派给具体的Job去处理。当超出最大范围的时候，停止该线程。&lt;br /&gt;
&lt;br /&gt;
== Job==&lt;br /&gt;
&lt;br /&gt;
Job对应到具体每条数据该如何传输，会稍微麻烦一点。但本质还是select然后insert，没什么技术含量。&lt;br /&gt;
&lt;br /&gt;
&amp;lt;syntaxhighlight lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
public class SeTransferJob implements TransferJob {&lt;br /&gt;
    private static final Logger logger = LoggerFactory.getLogger(SeTransferJob.class);&lt;br /&gt;
    private static final String query = &amp;quot;select * from sgk where id &amp;gt;? and id &amp;lt;=?&amp;quot;;&lt;br /&gt;
    private static final String insertSqlTemplate = &amp;quot;insert into se_record_%d(id, user_name, email, password, salt, source, remark) values (?,?,?,?,?,?,?);&amp;quot;;&lt;br /&gt;
&lt;br /&gt;
    @Override&lt;br /&gt;
    public void doTransfer(Connection source, Connection target, long startId, long endId) {&lt;br /&gt;
        logger.info(&amp;quot;Transferring records [{}, {}]&amp;quot;, startId, endId);&lt;br /&gt;
&lt;br /&gt;
        try {&lt;br /&gt;
            target.setAutoCommit(false);&lt;br /&gt;
        } catch (SQLException throwables) {&lt;br /&gt;
            throw new RuntimeException(&amp;quot;Cannot open transaction&amp;quot;);&lt;br /&gt;
        }&lt;br /&gt;
        long tableIndex = (startId / 10000000) + 1;&lt;br /&gt;
        String insertSql = String.format(insertSqlTemplate, tableIndex);&lt;br /&gt;
        try (PreparedStatement queryStatement = source.prepareStatement(query);&lt;br /&gt;
             PreparedStatement saveStatement = target.prepareStatement(insertSql)) {&lt;br /&gt;
            queryStatement.setLong(1, startId);&lt;br /&gt;
            queryStatement.setLong(2, endId);&lt;br /&gt;
            try (ResultSet result = queryStatement.executeQuery()) {&lt;br /&gt;
                while (result.next()) {&lt;br /&gt;
                    Record record = Record.from(result);&lt;br /&gt;
                    if (!record.isValid()) {&lt;br /&gt;
                        logger.warn(&amp;quot;Found invaid record:{}&amp;quot;, record);&lt;br /&gt;
                    } else {&lt;br /&gt;
                        record.attach(saveStatement);&lt;br /&gt;
                        saveStatement.addBatch();&lt;br /&gt;
                    }&lt;br /&gt;
                }&lt;br /&gt;
            }&lt;br /&gt;
            saveStatement.executeBatch();&lt;br /&gt;
            target.commit();&lt;br /&gt;
            logger.info(&amp;quot;Records [{}, {}] -&amp;gt; se_record_{} commited&amp;quot;, startId, endId, tableIndex);&lt;br /&gt;
        } catch (SQLException ex) {&lt;br /&gt;
            logger.error(&amp;quot;Unexpected sql error:&amp;quot;, ex);&lt;br /&gt;
            throw new RuntimeException(ex);&lt;br /&gt;
        }&lt;br /&gt;
    }&lt;br /&gt;
}&lt;br /&gt;
&amp;lt;/syntaxhighlight&amp;gt;&lt;br /&gt;
这里因为数据量太大，做了一个分区的处理，直接按照id来进行分区，每个表控制在千万以下。因此导入完成后，最终会有40多个表，每个表有接近1千万的数据。否则单表过大之后的插入性能会变得很低。&lt;br /&gt;
&lt;br /&gt;
以上就是整个ETL的核心思想，还是有一定的扩展性的，哈哈。&lt;/div&gt;</summary>
		<author><name>imported&gt;Riguz</name></author>
	</entry>
</feed>