Laravel 增量同步数据方法,通过更新时间和ID进行增量更新、插入。

in 日常随笔 with 0 comment 访问: 387 次

示例源码

<?php

namespace App\Console\Commands;


use App\Utils\ConstantUtil;
use Carbon\Carbon;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Redis;

class AnalysesDataHandleCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'sync:analysis';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = '同步admin_users和platform_products到adb';

    // 主库要同步的表(备份库的表结构先建好)
    protected $tables = ['admin_users', 'platform_products'];

    // 主库连接
    protected $sourceConnection = 'mysql';

    // 备份库连接比如adb
    protected $targetConnection = 'adb';

    // 时间缓存前缀
    protected $cacheKeyPrefix = ConstantUtil::SYNC_ANALYSES_LAST;


    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return int
     */
    public function handle()
    {
        $this->syncTable();
    }

    /**
     * 通用远程表数据同步方案
     */
    public function syncTable()
    {
        foreach ($this->tables as $table) {
            $this->info("开始同步表: {$table}");

            // 获取最新同步的时间
            $lastSyncTime = $this->getLastSyncTime($table);
            // print_r([$table,$lastSyncTime]);exit;
            $newOrUpdateRows = $this->fetchNewOrUpdateRows($table, $lastSyncTime);
            $this->info("发现 " . $newOrUpdateRows->count() . " 条待同步数据");

            $bar = $this->output->createProgressBar($newOrUpdateRows->count());
            $bar->start();

            foreach ($newOrUpdateRows as $row) {
                $id = $row->id;
                $data = (array) $row;
                $updatedAt = $row->updated_at;

                $exists = DB::connection($this->targetConnection)->table($table)
                    ->where('id', $id)
                    ->exists();

                if (!$exists) {
                    DB::connection($this->targetConnection)->table($table)->insert($data);
                    $this->info(" + 插入 ID:{$id}");
                } else {
                    $targetUpdatedAt = DB::connection($this->targetConnection)->table($table)
                        ->where('id', $id)
                        ->value('updated_at');

                    if ($updatedAt > $targetUpdatedAt) {
                        // 无法更新主键,除id外都需要去掉主键索引,因为可以被更新
                        unset($data['id']);
                        // print_r($data);exit;

                        DB::connection($this->targetConnection)->table($table)
                            ->where('id', $id)
                            ->update($data);
                        $this->info("更新 ID:{$id}");
                    } else {
                        $this->comment("跳过未变更 ID:{$id}");
                    }
                }
                $bar->advance();
            }

            $bar->finish();
            $this->newLine();

            // 记录当前同步时间
            $maxUpdatedAt = $newOrUpdateRows->max('updated_at');
            $this->setLastSyncTime($table, $maxUpdatedAt ?: now());
            $this->info("表 {$table} 同步完成,下次同步时间已记录");
        }
        $this->info("所有表同步完成!");
    }

    /**
     * 获取某张表的上次同步时间(从 Cache)
     */
    protected function getLastSyncTime($table)
    {
        $key = $this->cacheKeyPrefix . $table;
        $time = Redis::get($key);
        $time && $time = json_decode($time);


        if (!$time) {
            $this->warn("表 {$table} 无上次同步时间记录,将同步所有数据(或首次同步)");
            return null;
        }

        return Carbon::parse($time);
    }

    /**
     * 设置某张表的上次同步时间(存入 Cache)
     * @param $table
     * @param $time
     */
    protected function setLastSyncTime($table, $time)
    {
        $key = $this->cacheKeyPrefix . $table;
        Redis::set($key, json_encode($time));
    }

    /**
     * 从源库获取 updated_at > 上次同步时间 的数据
     * @param $table
     * @param Carbon|null $lastSyncTime
     * @return \Illuminate\Support\Collection
     */
    protected function fetchNewOrUpdateRows($table, ?Carbon $lastSyncTime)
    {
        $query = DB::connection($this->sourceConnection)->table($table);

        if ($lastSyncTime) {
            $query->where('updated_at', '>', $lastSyncTime);
        } else {
            // 避免全量更新,未同步则只查询最近一天的数据,有则更新。
            $this->warn("表 {$table} 无上次同步时间,执行首次同步(查询范围限制前一天)");
            $query->where('updated_at', '>=', now()->subDay());
            // $query->orderByDesc('updated_at')->limit(1000);
        }

        return $query->orderBy('updated_at')->get();
    }

}

赞赏支持
Responses