示例源码
<?php
namespace App\Console\Commands;
use App\Utils\ConstantUtil;
use Carbon\Carbon;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Redis;
class AnalysesDataHandleCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'sync:analysis';
/**
* The console command description.
*
* @var string
*/
protected $description = '同步admin_users和platform_products到adb';
// 主库要同步的表(备份库的表结构先建好)
protected $tables = ['admin_users', 'platform_products'];
// 主库连接
protected $sourceConnection = 'mysql';
// 备份库连接比如adb
protected $targetConnection = 'adb';
// 时间缓存前缀
protected $cacheKeyPrefix = ConstantUtil::SYNC_ANALYSES_LAST;
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
$this->syncTable();
}
/**
* 通用远程表数据同步方案
*/
public function syncTable()
{
foreach ($this->tables as $table) {
$this->info("开始同步表: {$table}");
// 获取最新同步的时间
$lastSyncTime = $this->getLastSyncTime($table);
// print_r([$table,$lastSyncTime]);exit;
$newOrUpdateRows = $this->fetchNewOrUpdateRows($table, $lastSyncTime);
$this->info("发现 " . $newOrUpdateRows->count() . " 条待同步数据");
$bar = $this->output->createProgressBar($newOrUpdateRows->count());
$bar->start();
foreach ($newOrUpdateRows as $row) {
$id = $row->id;
$data = (array) $row;
$updatedAt = $row->updated_at;
$exists = DB::connection($this->targetConnection)->table($table)
->where('id', $id)
->exists();
if (!$exists) {
DB::connection($this->targetConnection)->table($table)->insert($data);
$this->info(" + 插入 ID:{$id}");
} else {
$targetUpdatedAt = DB::connection($this->targetConnection)->table($table)
->where('id', $id)
->value('updated_at');
if ($updatedAt > $targetUpdatedAt) {
// 无法更新主键,除id外都需要去掉主键索引,因为可以被更新
unset($data['id']);
// print_r($data);exit;
DB::connection($this->targetConnection)->table($table)
->where('id', $id)
->update($data);
$this->info("更新 ID:{$id}");
} else {
$this->comment("跳过未变更 ID:{$id}");
}
}
$bar->advance();
}
$bar->finish();
$this->newLine();
// 记录当前同步时间
$maxUpdatedAt = $newOrUpdateRows->max('updated_at');
$this->setLastSyncTime($table, $maxUpdatedAt ?: now());
$this->info("表 {$table} 同步完成,下次同步时间已记录");
}
$this->info("所有表同步完成!");
}
/**
* 获取某张表的上次同步时间(从 Cache)
*/
protected function getLastSyncTime($table)
{
$key = $this->cacheKeyPrefix . $table;
$time = Redis::get($key);
$time && $time = json_decode($time);
if (!$time) {
$this->warn("表 {$table} 无上次同步时间记录,将同步所有数据(或首次同步)");
return null;
}
return Carbon::parse($time);
}
/**
* 设置某张表的上次同步时间(存入 Cache)
* @param $table
* @param $time
*/
protected function setLastSyncTime($table, $time)
{
$key = $this->cacheKeyPrefix . $table;
Redis::set($key, json_encode($time));
}
/**
* 从源库获取 updated_at > 上次同步时间 的数据
* @param $table
* @param Carbon|null $lastSyncTime
* @return \Illuminate\Support\Collection
*/
protected function fetchNewOrUpdateRows($table, ?Carbon $lastSyncTime)
{
$query = DB::connection($this->sourceConnection)->table($table);
if ($lastSyncTime) {
$query->where('updated_at', '>', $lastSyncTime);
} else {
// 避免全量更新,未同步则只查询最近一天的数据,有则更新。
$this->warn("表 {$table} 无上次同步时间,执行首次同步(查询范围限制前一天)");
$query->where('updated_at', '>=', now()->subDay());
// $query->orderByDesc('updated_at')->limit(1000);
}
return $query->orderBy('updated_at')->get();
}
}
赞赏支持
本文由 litblc 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Sep 8, 2025 at 10:09 am

