개발-데이터베이스

[함수] Mysql To mongodb

WEBKIKIS 2016. 3. 30. 17:43
반응형

[함수] Mysql To mongodb


phpclasses.org 에서 가져온 소스입니다

MySqlToMongo.php"


class MySqlToMongo
{

//List of mysql field types
private $mysql_data_type_hash = array(
1=>'tinyint',
2=>'smallint',
3=>'int',
4=>'float',
5=>'double',
7=>'timestamp',
8=>'bigint',
9=>'mediumint',
10=>'date',
11=>'time',
12=>'datetime',
13=>'year',
16=>'bit',
//252 is currently mapped to all text and blob types (MySQL 5.0.51a)
252=>'string',
253=>'varchar',
254=>'char',
246=>'decimal'
);





private $mysqli;


private $mysqli2;


private $mysql_tables;


private $mongo_client;


private $mongo_db;


private $debugMode = false;


private $batchRecords = 10000;


private $dropIfExists = true;


public function __construct()
{
//Your custom code here
}




public function __destruct()
{
try {
@$this->mysqli->close();
}
catch (Exception $e) {}

try {
@$this->mysqli2->close();
}
catch (Exception $e) {}

try {
@$this->mongo_client->close();
}
catch (Exception $e) {}
}



public function setBatchRecords($batchRecords)
{
$this->batchRecords = $batchRecords;
}


public function setDropIfExists($dropIfExists)
{
$this->dropIfExists = $dropIfExists;
}




public function setDebugMode($debugMode) {
$this->debugMode = (boolean) $debugMode;
}


public function mysql_connect($mysql_host, $mysql_user, $mysql_pswd, $mysql_database, $mysql_port = 3306)
{
try {
$this->mysqli = new mysqli($mysql_host, $mysql_user, $mysql_pswd, $mysql_database, $mysql_port);
$this->mysqli->query("SET CHARACTER_SET_RESULTS=utf8");
$this->mysqli->query("SET NAMES utf8");

$this->mysqli2 = new mysqli($mysql_host, $mysql_user, $mysql_pswd, $mysql_database, $mysql_port);
$this->mysqli2->query("SET CHARACTER_SET_RESULTS=utf8");
$this->mysqli2->query("SET NAMES utf8");
}
catch (Exception $e) {
die("\nError connecting to mysql: " . $e->getMessage());
}
}


public function mongodb_connect($mongo_host, $mongo_user, $mongo_pswd, $mongo_database, $mongo_options = array())
{
try {
$this->mongo_client = new MongoClient("mongodb://" . $mongo_user . ":" . $mongo_pswd . "@" . $mongo_host, $mongo_options);
$this->mongo_db = $this->mongo_client->selectDB($mongo_database);
}
catch (Exception $e) {
die("\nError connecting to mongo: " . $e->getMessage());
}
}


public function getMySqlTables()
{
$sql = "SHOW TABLES";
$rs = $this->mysqli->query($sql);
$this->mysql_tables = array();
while ($row = $rs->fetch_array()) {
$this->mysql_tables[] = $row[0];
}
return $this->mysql_tables;
}



public function import($tables)
{
if(!is_array($tables)) throw new Exception("tables is not an array");

$counter = 0;
foreach($tables as $table):

//Create the class reflection
if($this->dropIfExists) {
$this->dropCollection($table);
}

$this->insertValues($table);

$counter++;
// if($counter > 2) {
// break;
// }
endforeach;

}



##################### START PRIVATE METHODS ##############################


private function createIndexes($table, $coll)
{
if(!$this->dropIfExists) {
return true;
}

$sql = "SHOW INDEX FROM ".$table;
$rs = $this->mysqli2->query($sql);
if(!$rs) {
throw new Exception("Error getting index from table $table");
}

// This variable is used to understand
// when the resultset change to another index
$last_field_name = "";

// Array with the list of fields for the current index
$fields = array();

// Options for the current index
$index_options = array();

while($row = $rs->fetch_object()):

if($last_field_name !== $row->Key_name) {
//New index mapping, save previous index if exists
if(!empty($fields)) {
//Save the index to mongodb
$coll->ensureIndex($fields, $index_options);

//reset fields array
$fields = array();
}
}
$last_field_name = $row->Key_name;

//Add the field name
$fields[$row->Column_name] = 1;

//Index setting for mongo
$index_options = array(
"unique" => $row->Non_unique == 0,
"dropDups" => FALSE,
"sparse" => FALSE,
"name" => $row->Key_name
);

endwhile;

//Insert last index if exists
if(!empty($fields)) {
//Save the index to mongodb
$coll->ensureIndex($fields, $index_options);

//reset fields array
$fields = array();
}

}



private function setObjectMapping($finfo, &$obj)
{

foreach($finfo as $field):

$type = @$this->mysql_data_type_hash[$field->type];
if(empty($type)) {
//Mysql field type not found
continue;
}

if($type === "int" || $type === "tinyint" || $type === "smallint") {
$obj->{$field->name} = (int) $obj->{$field->name};
}
else if($type === "date" || $type === "datetime") {
$obj->{$field->name} = new MongoDate(strtotime($obj->{$field->name}));
}
else if($type === "decimal" || $type === "float") {
$obj->{$field->name} = (float) $obj->{$field->name};
}
else if($type === "double") {
$obj->{$field->name} = (double) $obj->{$field->name};
}

endforeach;

}


private function dropCollection($table)
{
$coll = $this->mongo_db->selectCollection($table);
$coll->drop();
return true;
}



private function insertValues($table)
{
echo date("Y-m-d H:i:s")." Insert record for $table ... ";
$this->log("\n");
$coll = $this->mongo_db->selectCollection($table);

$sql = "SELECT * FROM ".$table;
$rs = $this->mysqli->query($sql, MYSQLI_USE_RESULT);
$counter = 0;

//Get fields types for mapping
$finfo = $rs->fetch_fields();

//Variable to check if indexes are created for this collection
$index_created = false;

//List of all elements to insert
$elements = array();
while($row = $rs->fetch_object()):
try {
$counter++;

$this->setObjectMapping($finfo, $row);
$elements[] = $row;

if(count($elements) >= $this->batchRecords) {
@$coll->batchInsert($elements);
$elements = array();
$this->log("\t$table $counter records\n");

//Create all index if they are not created yet
if(!$index_created) {
//Create table index
$this->createIndexes($table, $coll);
}

// //DEBUG
// echo "[OK]\n";
// return true;
}

//Use this function to insert single records
//@$coll->insert($row);
}
catch(Exception $e) {
echo "[ERR]\n";
$this->log("\t$table -> " . $e->getMessage()."\n");
return false;
}
endwhile;

//Insert remaining objects
try {
@$coll->batchInsert($elements);

//Create all index if they are not created yet
if(!$index_created) {
//Create table index
$this->createIndexes($table, $coll);
}

//Release memory
unset($elements);
$this->log("\n\t$table $counter records");
}
catch(Exception $e) {
echo "[ERR]\n";
$this->log("\t$table -> " . $e->getMessage()."\n");
return false;
}

echo "[OK]\n";
return true;
}



private function log($msg) {
if ($this->debugMode) {
echo $msg;
}
}


##################### END PRIVATE METHODS ##############################

}

test.php


require_once("MySqlToMongo.php");

$mysql_host = "localhost";
$mysql_user = "root";
$mysql_pswd = "";
$mysql_database = "test";

$mongo_host = "localhost";
$mongo_user = "";
$mongo_pswd = "";
$mongo_database = "test";

$imp = new MySqlToMongo();

//Establishing connections to mysql and mongo instances
$imp->mysql_connect($mysql_host, $mysql_user, $mysql_pswd, $mysql_database);
$imp->mongodb_connect($mongo_host, $mongo_user, $mongo_pswd, $mongo_database);

//Set the number of concurrent records to insert
$imp->setBatchRecords(1000);

//Set debug mode to read debug messages
$imp->setDebugMode(true);

//Get all tables from the database
//if you want to import single tables just pass an array of names ex: $list("tbl1","tbl2")
$tables = $imp->getMySqlTables();

//Set the dropping tables if exists into mongo. If false it will append records
$imp->setDropIfExists(true);

//Starting import session
$imp->import($tables);

반응형