Python实现 多进程导入CSV数据到 MySQL

 更新时间:2017年7月6日 23:49  点击:1357
小编推荐的这篇文章介绍了Python实现 多进程导入CSV数据到 MySQL的教程,非常实用,有兴趣的同学快来看看吧

前段时间帮同事处理了一个把 CSV 数据导入到 MySQL 的需求。两个很大的 CSV 文件, 分别有 3GB、2100 万条记录和  7GB、3500 万条记录。对于这个量级的数据,用简单的单进程/单线程导入  会耗时很久,最终用了多进程的方式来实现。具体过程不赘述,记录一下几个要点:

  1. 批量插入而不是逐条插入

  2. 为了加快插入速度,先不要建索引

  3. 生产者和消费者模型,主进程读文件,多个 worker 进程执行插入

  4. 注意控制 worker 的数量,避免对 MySQL 造成太大的压力

  5. 注意处理脏数据导致的异常

  6. 原始数据是 GBK 编码,所以还要注意转换成 UTF-8

  7. 用 click 封装命令行工具

具体的代码实现如下:

 

 代码如下复制代码

#!/usr/bin/env python

# -*- coding: utf-8 -*-

 

importcodecs

importcsv

importlogging

importmultiprocessing

importos

importwarnings

 

importclick

importMySQLdb

importsqlalchemy

 

warnings.filterwarnings('ignore', category=MySQLdb.Warning)

 

# 批量插入的记录数量

BATCH=5000

 

DB_URI='mysql://root@localhost:3306/example?charset=utf8'

 

engine=sqlalchemy.create_engine(DB_URI)

 

 

defget_table_cols(table):

  sql='SELECT * FROM `{table}` LIMIT 0'.format(table=table)

  res=engine.execute(sql)

  returnres.keys()

 

 

definsert_many(table, cols, rows, cursor):

  sql='INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format(

      table=table,

      cols=', '.join(cols),

      marks=', '.join(['%s']*len(cols)))

  cursor.execute(sql,*rows)

  logging.info('process %s inserted %s rows into table %s', os.getpid(),len(rows), table)

 

 

definsert_worker(table, cols, queue):

  rows=[]

  # 每个子进程创建自己的 engine 对象

  cursor=sqlalchemy.create_engine(DB_URI)

  whileTrue:

    row=queue.get()

    ifrowisNone:

      ifrows:

        insert_many(table, cols, rows, cursor)

      break

 

    rows.append(row)

    iflen(rows)==BATCH:

      insert_many(table, cols, rows, cursor)

      rows=[]

 

 

definsert_parallel(table, reader, w=10):

  cols=get_table_cols(table)

 

  # 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据

  # 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存

  queue=multiprocessing.Queue(maxsize=w*BATCH*2)

  workers=[]

  foriinrange(w):

    p=multiprocessing.Process(target=insert_worker, args=(table, cols, queue))

    p.start()

    workers.append(p)

    logging.info('starting # %s worker process, pid: %s...', i+1, p.pid)

 

  dirty_data_file='./{}_dirty_rows.csv'.format(table)

  xf=open(dirty_data_file,'w')

  writer=csv.writer(xf, delimiter=reader.dialect.delimiter)

 

  forlineinreader:

    # 记录并跳过脏数据: 键值数量不一致

    iflen(line) !=len(cols):

      writer.writerow(line)

      continue

 

    # 把 None 值替换为 'NULL'

    clean_line=[Noneifx=='NULL'elsexforxinline]

 

    # 往队列里写数据

    queue.put(tuple(clean_line))

    ifreader.line_num%500000==0:

      logging.info('put %s tasks into queue.', reader.line_num)

 

  xf.close()

 

  # 给每个 worker 发送任务结束的信号

  logging.info('send close signal to worker processes')

  foriinrange(w):

    queue.put(None)

 

  forpinworkers:

    p.join()

 

 

defconvert_file_to_utf8(f, rv_file=None):

  ifnotrv_file:

    name, ext=os.path.splitext(f)

    ifisinstance(name,unicode):

      name=name.encode('utf8')

    rv_file='{}_utf8{}'.format(name, ext)

  logging.info('start to process file %s', f)

  withopen(f) as infd:

    withopen(rv_file,'w') as outfd:

      lines=[]

      loop=0

      chunck=200000

      first_line=infd.readline().strip(codecs.BOM_UTF8).strip()+'\n'

      lines.append(first_line)

      forlineininfd:

        clean_line=line.decode('gb18030').encode('utf8')

        clean_line=clean_line.rstrip()+'\n'

        lines.append(clean_line)

        iflen(lines)==chunck:

          outfd.writelines(lines)

          lines=[]

          loop+=1

          logging.info('processed %s lines.', loop*chunck)

 

      outfd.writelines(lines)

      logging.info('processed %s lines.', loop*chunck+len(lines))

 

 

@click.group()

defcli():

  logging.basicConfig(level=logging.INFO,

            format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')

 

 

@cli.command('gbk_to_utf8')

@click.argument('f')

defconvert_gbk_to_utf8(f):

  convert_file_to_utf8(f)

 

 

@cli.command('load')

@click.option('-t','--table', required=True,help='表名')

@click.option('-i','--filename', required=True,help='输入文件')

@click.option('-w','--workers', default=10,help='worker 数量,默认 10')

defload_fac_day_pro_nos_sal_table(table, filename, workers):

  withopen(filename) as fd:

    fd.readline() # skip header

    reader=csv.reader(fd)

    insert_parallel(table, reader, w=workers)

 

 

if__name__=='__main__':

  cli()

 

小编给大家推荐的这篇文章介绍了简单谈谈PHP中的trait的教程,非常实用,有兴趣的同学快来看看吧

前言

之前的一个同事换工作,在面试被问到了 PHP 的 trait 。因为没用过, 所以没答好,我大概是用过几次的,想了想整理了以下的总结。

trait

trait 是在一些类(Class)的应该具备的特定的属性或方法,而同父级的另外一些类应该避免包含这些属性和方法情况下使用的.

当然, 这也和开发者对类的抽象能力有关, 有些抽象能力好的, 可以减少对 trait 的使用 但是这种情况应该是无法避免的 不然 trait 出现就毫无意义了.

还有一种情况, 就是使用 trait 的时候, 可以起到的约束开发者的作用, 提醒开发者注意需要在开发的过程中调用 trait 的某些属性和方法.

同事则提出了一个好问题, 接口(interface) 不也是这个作用么?

不急, 让我们先看个例子:

比如你要收集网站上各类数据, 开发了 Spider 类. Spider有个方法叫request()负责请求.

 

 代码如下 复制代码

<?phpnamespaceXWSoul\Network;

classSpider

{

 publicfunctionrequest($url)

 {

 //do sth.

 }

}

 

但是采集数据的过程中, 有些网站对蜘蛛敏感有些则不. 对于敏感的网站, 我们给出了一个使用代理的解决方案. 但是使用代理是会影响抓取速度的. 这就产生了 Spider 的子类有些需要用代理, 而能不用代理则尽量不用的情况.

于是这个时候我们新增了一个 trait Proxy:

 

 代码如下 复制代码

<?phpnamespaceXWSoul\Network;

trait Proxy

{

 

 protected$isProxy= false;

 

 publicfunctionuseProxy($proxy)

 {

 //do sth proxy setups.

 $this->isProxy = true;

 return$this;

 }

 

 publicfunctionrequest($url)

 {

 if(!$this->isProxy) {

  thrownewException("Please using proxy.");

 }

 //do sth.

 returnparent::request($url);

 }

}

 

trait 重写了 Spider 的request()方法, 限定了在没有调用代理的情况下调用会抛出异常.

回到之前的问题, trait 这样的用法和 接口(interface) 有什么区别?

接口的约束是前置的是定义初始就必须实现的, 他可以约束方法的实现却无法约束方法的调用, trait 是一种后置的调用, 他已经实现了方法,  关键的是, 他只对调用了自身的类产生约束(废话一句), 而对没有调用自身的类不产生影响(再一句废话), 同时他是可复用的, 而且没有破坏  Spider 类自身的实现增加, Spider 还是那个 Spider.

我想 trait 的用法再这里已经很有效了吧.

后话

有人可能决定 另外实现一个 request 比如叫, proxyRequst 不就完了么? 你说的好有道理&hellip;然是如果我使用了不一样的 代理具体对请求上有细节差异怎么办呢? 在代码里不停的 if if if 么? trait 如此清爽的方案 为何要放弃呢?

PHP实现批量删除效果也是很多用户会遇到的,这里文章就给大家介绍下PHP怎么实现批量删除,有些什么实现方法,感兴趣的下面就具体来看看。

前台

 

  

  

 代码如下复制代码

<!DOCTYPE html>

<html>

<head>

  <title>批量删除</title>

</head>

<body>

<scripttype="text/javascript">

  

//复选框

function checkall(all)

{

  var ck = document.getElementsByClassName("ck");

  

  if(all.checked)

  {

   for(var i=0;i<ck.length;i++)

   {

     ck[i].setAttribute("checked","checked");

   }

  }

  else

  {

   for(vari=0;i<ck.length;i++)

   {

     ck[i].removeAttribute("checked");

   }

  }

}

</script>

  

<formaction="test.php"method="post">

<tableborder="1">

  <tr><th><inputtype="checkbox"name="all"onclick="checkall(this)"/>id</th><th>名字</th></tr>

  

<!-- 此处调用显示列表函数 -->

<?phpshow() ?>

  

<tr><tdcolspan="3"><inputtype="submit"value="批量删除"></td></tr>

</table>

</form>

</body>

  

<?php 

  

//显示列表

function show()

{

  //连接数据库

  @mysql_connect('localhost','root','');

  mysql_select_db('test');

  mysql_query('set names utf8');

  

  $sql="select id,name from test";

  $res=mysql_query($sql);

  

  //循环取出数据

   while($row=mysql_fetch_row($res))

    {

     echo "<tr>

      <td>

       <inputtype='checkbox'value='{$row[0]}'name='item[]'class='ck'/>

       {$row[0]}

     </td>

      <td>{$row[1]}</td>

    </tr>";

    }

  }

?>

</html>


id名字


后台

 

 代码如下复制代码

<?php 

  

//接收post传来的数组

$arr=$_POST["item"];

  

/**

* 批量删除 

* 思路:把前台批量选择的数据放在数组里,删除该数组即可 

* @param $arr 

* @return $res 成功or失败

*/

functionbatch_del($arr)

{

  @mysql_connect('localhost','root','');

  mysql_select_db('test');

  mysql_query('set names utf8');

  

  //把数组元素组合为字符串:

  $str= implode("','",$arr);

  //in 表示多个

  $sql="delete from test where id in('{$str}')";

  $res= mysql_query($sql);

  

  if(!$res){

      echo"删除失败";

    }else{

  

      if(mysql_affected_rows()>0){

        echo"删除成功";

      }else{

        echo"没有行受到影响";  

      }

    } 

  }

  

//调用批量删除函数

  

batch_del($arr);

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助。

本文介绍了PHP函数rtrim()使用中的怪异现象分析,非常实用,有兴趣的同学快来看看吧。

今天用rtrim()函数时遇到了一个奇怪的问题:

 

 代码如下 复制代码

echortrim('<p></div>','</div>');// 输出为 <p

echoltrim('www.jb51.net','www.');// 输出为 jb51.net

 

以上的输出结果有点出人意料,本来我想第一行应该输出<p>,第二行会输出jb51.net。

这个问题纠结了我好久,一直没有找到原因,后来在手册中找到了答案:

rtrim()是以字符为单位替换,而不是以字符串的。从右往左替换时</div>6个字符肯定会被替换掉的,再往左时遇到了>,因为>也包含在rtirm()的第二个参数的字符串(</div>)中,所以也被替换掉了,当再往左时遇到了p,这时p不包含在第二个参数的字符串中。所以替换停止,输出了<p。

如果这样理解的话,第二行的输出结果就是在意料之中了。呵呵&hellip;&hellip;手册中已经写的清清楚楚了。原文:

You can also specify the characters you  want to strip, by means of the charlist parameter. Simply list all  characters that you want to be stripped. With .. you can specify a range  of characters。

由此可见,rtrim、ltrim与trim第二个参数是作为一组字符列表进行匹配操作的。这与我们以往认识的str_replace函数的替换操作不一样。

[!--infotagslink--]

相关文章

  • python opencv 画外接矩形框的完整代码

    这篇文章主要介绍了python-opencv-画外接矩形框的实例代码,代码简单易懂,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-09-04
  • Python astype(np.float)函数使用方法解析

    这篇文章主要介绍了Python astype(np.float)函数使用方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下...2020-06-08
  • 最炫Python烟花代码全解析

    2022虎年新年即将来临,小编为大家带来了一个利用Python编写的虎年烟花特效,堪称全网最绚烂,文中的示例代码简洁易懂,感兴趣的同学可以动手试一试...2022-02-14
  • python中numpy.empty()函数实例讲解

    在本篇文章里小编给大家分享的是一篇关于python中numpy.empty()函数实例讲解内容,对此有兴趣的朋友们可以学习下。...2021-02-06
  • Python 图片转数组,二进制互转操作

    这篇文章主要介绍了Python 图片转数组,二进制互转操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-03-09
  • python-for x in range的用法(注意要点、细节)

    这篇文章主要介绍了python-for x in range的用法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-05-10
  • Python中的imread()函数用法说明

    这篇文章主要介绍了Python中的imread()函数用法说明,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-03-16
  • python实现b站直播自动发送弹幕功能

    这篇文章主要介绍了python如何实现b站直播自动发送弹幕,帮助大家更好的理解和学习使用python,感兴趣的朋友可以了解下...2021-02-20
  • python Matplotlib基础--如何添加文本和标注

    这篇文章主要介绍了python Matplotlib基础--如何添加文本和标注,帮助大家更好的利用Matplotlib绘制图表,感兴趣的朋友可以了解下...2021-01-26
  • 解决python 使用openpyxl读写大文件的坑

    这篇文章主要介绍了解决python 使用openpyxl读写大文件的坑,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-03-13
  • python 计算方位角实例(根据两点的坐标计算)

    今天小编就为大家分享一篇python 计算方位角实例(根据两点的坐标计算),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-04-27
  • python实现双色球随机选号

    这篇文章主要为大家详细介绍了python实现双色球随机选号,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2020-05-02
  • python中使用np.delete()的实例方法

    在本篇文章里小编给大家整理的是一篇关于python中使用np.delete()的实例方法,对此有兴趣的朋友们可以学习参考下。...2021-02-01
  • 使用Python的pencolor函数实现渐变色功能

    这篇文章主要介绍了使用Python的pencolor函数实现渐变色功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-03-09
  • python自动化办公操作PPT的实现

    这篇文章主要介绍了python自动化办公操作PPT的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-02-05
  • Python getsizeof()和getsize()区分详解

    这篇文章主要介绍了Python getsizeof()和getsize()区分详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-11-20
  • PyTorch一小时掌握之迁移学习篇

    这篇文章主要介绍了PyTorch一小时掌握之迁移学习篇,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-09-08
  • python实现学生通讯录管理系统

    这篇文章主要为大家详细介绍了python实现学生通讯录管理系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-02-25
  • 解决python 两个时间戳相减出现结果错误的问题

    这篇文章主要介绍了解决python 两个时间戳相减出现结果错误的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-03-12
  • Python绘制的爱心树与表白代码(完整代码)

    这篇文章主要介绍了Python绘制的爱心树与表白代码,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-04-06