[함수] Mysql To mongodb
phpclasses.org 에서 가져온 소스입니다
MySqlToMongo.php"
class MySqlToMongo
{
//List of mysql field types
private $mysql_data_type_hash = array(
1=>'tinyint',
2=>'smallint',
3=>'int',
4=>'float',
5=>'double',
7=>'timestamp',
8=>'bigint',
9=>'mediumint',
10=>'date',
11=>'time',
12=>'datetime',
13=>'year',
16=>'bit',
//252 is currently mapped to all text and blob types (MySQL 5.0.51a)
252=>'string',
253=>'varchar',
254=>'char',
246=>'decimal'
);
private $mysqli;
private $mysqli2;
private $mysql_tables;
private $mongo_client;
private $mongo_db;
private $debugMode = false;
private $batchRecords = 10000;
private $dropIfExists = true;
public function __construct()
{
//Your custom code here
}
public function __destruct()
{
try {
@$this->mysqli->close();
}
catch (Exception $e) {}
try {
@$this->mysqli2->close();
}
catch (Exception $e) {}
try {
@$this->mongo_client->close();
}
catch (Exception $e) {}
}
public function setBatchRecords($batchRecords)
{
$this->batchRecords = $batchRecords;
}
public function setDropIfExists($dropIfExists)
{
$this->dropIfExists = $dropIfExists;
}
public function setDebugMode($debugMode) {
$this->debugMode = (boolean) $debugMode;
}
public function mysql_connect($mysql_host, $mysql_user, $mysql_pswd, $mysql_database, $mysql_port = 3306)
{
try {
$this->mysqli = new mysqli($mysql_host, $mysql_user, $mysql_pswd, $mysql_database, $mysql_port);
$this->mysqli->query("SET CHARACTER_SET_RESULTS=utf8");
$this->mysqli->query("SET NAMES utf8");
$this->mysqli2 = new mysqli($mysql_host, $mysql_user, $mysql_pswd, $mysql_database, $mysql_port);
$this->mysqli2->query("SET CHARACTER_SET_RESULTS=utf8");
$this->mysqli2->query("SET NAMES utf8");
}
catch (Exception $e) {
die("\nError connecting to mysql: " . $e->getMessage());
}
}
public function mongodb_connect($mongo_host, $mongo_user, $mongo_pswd, $mongo_database, $mongo_options = array())
{
try {
$this->mongo_client = new MongoClient("mongodb://" . $mongo_user . ":" . $mongo_pswd . "@" . $mongo_host, $mongo_options);
$this->mongo_db = $this->mongo_client->selectDB($mongo_database);
}
catch (Exception $e) {
die("\nError connecting to mongo: " . $e->getMessage());
}
}
public function getMySqlTables()
{
$sql = "SHOW TABLES";
$rs = $this->mysqli->query($sql);
$this->mysql_tables = array();
while ($row = $rs->fetch_array()) {
$this->mysql_tables[] = $row[0];
}
return $this->mysql_tables;
}
public function import($tables)
{
if(!is_array($tables)) throw new Exception("tables is not an array");
$counter = 0;
foreach($tables as $table):
//Create the class reflection
if($this->dropIfExists) {
$this->dropCollection($table);
}
$this->insertValues($table);
$counter++;
// if($counter > 2) {
// break;
// }
endforeach;
}
##################### START PRIVATE METHODS ##############################
private function createIndexes($table, $coll)
{
if(!$this->dropIfExists) {
return true;
}
$sql = "SHOW INDEX FROM ".$table;
$rs = $this->mysqli2->query($sql);
if(!$rs) {
throw new Exception("Error getting index from table $table");
}
// This variable is used to understand
// when the resultset change to another index
$last_field_name = "";
// Array with the list of fields for the current index
$fields = array();
// Options for the current index
$index_options = array();
while($row = $rs->fetch_object()):
if($last_field_name !== $row->Key_name) {
//New index mapping, save previous index if exists
if(!empty($fields)) {
//Save the index to mongodb
$coll->ensureIndex($fields, $index_options);
//reset fields array
$fields = array();
}
}
$last_field_name = $row->Key_name;
//Add the field name
$fields[$row->Column_name] = 1;
//Index setting for mongo
$index_options = array(
"unique" => $row->Non_unique == 0,
"dropDups" => FALSE,
"sparse" => FALSE,
"name" => $row->Key_name
);
endwhile;
//Insert last index if exists
if(!empty($fields)) {
//Save the index to mongodb
$coll->ensureIndex($fields, $index_options);
//reset fields array
$fields = array();
}
}
private function setObjectMapping($finfo, &$obj)
{
foreach($finfo as $field):
$type = @$this->mysql_data_type_hash[$field->type];
if(empty($type)) {
//Mysql field type not found
continue;
}
if($type === "int" || $type === "tinyint" || $type === "smallint") {
$obj->{$field->name} = (int) $obj->{$field->name};
}
else if($type === "date" || $type === "datetime") {
$obj->{$field->name} = new MongoDate(strtotime($obj->{$field->name}));
}
else if($type === "decimal" || $type === "float") {
$obj->{$field->name} = (float) $obj->{$field->name};
}
else if($type === "double") {
$obj->{$field->name} = (double) $obj->{$field->name};
}
endforeach;
}
private function dropCollection($table)
{
$coll = $this->mongo_db->selectCollection($table);
$coll->drop();
return true;
}
private function insertValues($table)
{
echo date("Y-m-d H:i:s")." Insert record for $table ... ";
$this->log("\n");
$coll = $this->mongo_db->selectCollection($table);
$sql = "SELECT * FROM ".$table;
$rs = $this->mysqli->query($sql, MYSQLI_USE_RESULT);
$counter = 0;
//Get fields types for mapping
$finfo = $rs->fetch_fields();
//Variable to check if indexes are created for this collection
$index_created = false;
//List of all elements to insert
$elements = array();
while($row = $rs->fetch_object()):
try {
$counter++;
$this->setObjectMapping($finfo, $row);
$elements[] = $row;
if(count($elements) >= $this->batchRecords) {
@$coll->batchInsert($elements);
$elements = array();
$this->log("\t$table $counter records\n");
//Create all index if they are not created yet
if(!$index_created) {
//Create table index
$this->createIndexes($table, $coll);
}
// //DEBUG
// echo "[OK]\n";
// return true;
}
//Use this function to insert single records
//@$coll->insert($row);
}
catch(Exception $e) {
echo "[ERR]\n";
$this->log("\t$table -> " . $e->getMessage()."\n");
return false;
}
endwhile;
//Insert remaining objects
try {
@$coll->batchInsert($elements);
//Create all index if they are not created yet
if(!$index_created) {
//Create table index
$this->createIndexes($table, $coll);
}
//Release memory
unset($elements);
$this->log("\n\t$table $counter records");
}
catch(Exception $e) {
echo "[ERR]\n";
$this->log("\t$table -> " . $e->getMessage()."\n");
return false;
}
echo "[OK]\n";
return true;
}
private function log($msg) {
if ($this->debugMode) {
echo $msg;
}
}
##################### END PRIVATE METHODS ##############################
}
test.php
require_once("MySqlToMongo.php");
$mysql_host = "localhost";
$mysql_user = "root";
$mysql_pswd = "";
$mysql_database = "test";
$mongo_host = "localhost";
$mongo_user = "";
$mongo_pswd = "";
$mongo_database = "test";
$imp = new MySqlToMongo();
//Establishing connections to mysql and mongo instances
$imp->mysql_connect($mysql_host, $mysql_user, $mysql_pswd, $mysql_database);
$imp->mongodb_connect($mongo_host, $mongo_user, $mongo_pswd, $mongo_database);
//Set the number of concurrent records to insert
$imp->setBatchRecords(1000);
//Set debug mode to read debug messages
$imp->setDebugMode(true);
//Get all tables from the database
//if you want to import single tables just pass an array of names ex: $list("tbl1","tbl2")
$tables = $imp->getMySqlTables();
//Set the dropping tables if exists into mongo. If false it will append records
$imp->setDropIfExists(true);
//Starting import session
$imp->import($tables);
'개발-데이터베이스' 카테고리의 다른 글
[함수] mysql 문자열을 구분자로 구분해서 처리시 관련 함수 (0) | 2016.03.30 |
---|