|
|
@@ -0,0 +1,796 @@
|
|
|
+# 本地-云端双向数据同步与连通性测试方案
|
|
|
+
|
|
|
+## 一、方案概述
|
|
|
+
|
|
|
+### 1.1 背景与约束
|
|
|
+- **本地服务器A**:不可信区,不受管控,不能暴露云端数据库密码
|
|
|
+- **云服务器**:可信区,持有数据库连接凭证
|
|
|
+- **同步需求**:双向同步(本地↔云端),支持手动触发与自动同步
|
|
|
+- **连通性测试**:验证本地Agent与云端API通道的可用性
|
|
|
+
|
|
|
+### 1.2 核心设计原则
|
|
|
+| 原则 | 说明 |
|
|
|
+|------|------|
|
|
|
+| **单向出站** | 所有连接由本地主动发起(HTTPS出站),云端零暴露 |
|
|
|
+| **密码隔离** | 云端DB密码仅存在于云端服务,本地仅持有API Token |
|
|
|
+| **游标断点续传** | 基于`sync_version`游标,支持断网恢复后增量同步 |
|
|
|
+| **冲突解决** | 默认`last-write-wins`,云端优先;可配置为本地优先 |
|
|
|
+| **分层检测** | 连通性测试覆盖网络层、认证层、租户层 |
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## 二、架构设计
|
|
|
+
|
|
|
+```
|
|
|
+┌─────────────────────────────────────────────────────────────────┐
|
|
|
+│ 云服务器(可信区) │
|
|
|
+│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
|
|
|
+│ │ SyncReceive │ │ SyncCommand │ │ Cloud Data Service │ │
|
|
|
+│ │ Controller │ │ Controller │ │ (直连云端DB) │ │
|
|
|
+│ │ (接收推送) │ │ (下发指令/数据) │ │ │ │
|
|
|
+│ └──────┬─────────┘ └──────┬───────┘ └──────────────────────┘ │
|
|
|
+│ ▲ │ │
|
|
|
+│ │ │ │
|
|
|
+│ HTTPS│出站上推 │HTTPS出站长轮询拉取 │
|
|
|
+│ │ │ │
|
|
|
+│ ┌──────┴───────────────────┴──────────────────────────────────┐ │
|
|
|
+│ │ 本地服务器A(不可信区) │ │
|
|
|
+│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
|
|
|
+│ │ │ LocalPush │ │ LocalPull │ │ Local Data │ │ │
|
|
|
+│ │ │ Service │ │ Service │ │ Service │ │ │
|
|
|
+│ │ │ (读取本地变更) │ │ (拉云端指令) │ │ (操作本地DB) │ │ │
|
|
|
+│ │ └──────┬─────────┘ └──────┬───────┘ └──────┬───────┘ │ │
|
|
|
+│ │ │ │ │ │ │
|
|
|
+│ │ ┌──────┴───────────────────┴──────────────────┘ │ │
|
|
|
+│ │ │ Local Sync Agent (调度核心) │ │
|
|
|
+│ │ │ • Push线程:定时/实时扫描本地变更 → 推云端 │ │
|
|
|
+│ │ │ • Pull线程:长轮询云端指令队列 → 写入本地 │ │
|
|
|
+│ │ │ • 手动触发:REST接口接收任务 → 提交线程池 │ │
|
|
|
+│ │ └────────────────────────────────────────────────────────────┘ │
|
|
|
+│ └─────────────────────────────────────────────────────────────────┘
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## 三、数据库表设计
|
|
|
+
|
|
|
+### 3.1 本地业务表(示例:device_data)
|
|
|
+```sql
|
|
|
+CREATE TABLE local_device_data (
|
|
|
+ id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
|
|
+ tenant_id VARCHAR(50) NOT NULL,
|
|
|
+ device_code VARCHAR(50),
|
|
|
+ sensor_value DECIMAL(10,2),
|
|
|
+ -- 同步控制字段
|
|
|
+ sync_version BIGINT COMMENT '数据版本时间戳(毫秒)',
|
|
|
+ sync_status TINYINT DEFAULT 0 COMMENT '0:未同步 1:已推送 2:推送失败',
|
|
|
+ sync_direction VARCHAR(10) DEFAULT 'LOCAL' COMMENT 'LOCAL/CLOUD 数据来源',
|
|
|
+ update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
|
+ create_time DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
|
+);
|
|
|
+```
|
|
|
+
|
|
|
+### 3.2 本地同步游标表
|
|
|
+```sql
|
|
|
+CREATE TABLE local_sync_cursor (
|
|
|
+ id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
|
|
+ tenant_id VARCHAR(50) NOT NULL,
|
|
|
+ table_name VARCHAR(50) NOT NULL,
|
|
|
+ direction VARCHAR(10) NOT NULL COMMENT 'UP/DOWN',
|
|
|
+ last_sync_version BIGINT DEFAULT 0 COMMENT '最后同步的版本号',
|
|
|
+ last_sync_time DATETIME,
|
|
|
+ UNIQUE KEY uk_tenant_table_dir (tenant_id, table_name, direction)
|
|
|
+);
|
|
|
+```
|
|
|
+
|
|
|
+### 3.3 云端下发队列(云端→本地)
|
|
|
+```sql
|
|
|
+CREATE TABLE cloud_sync_down_queue (
|
|
|
+ id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
|
|
+ tenant_id VARCHAR(50) NOT NULL,
|
|
|
+ table_name VARCHAR(50) NOT NULL,
|
|
|
+ operation VARCHAR(20) COMMENT 'INSERT/UPDATE/DELETE',
|
|
|
+ payload JSON NOT NULL COMMENT '完整数据JSON',
|
|
|
+ sync_version BIGINT NOT NULL,
|
|
|
+ status TINYINT DEFAULT 0 COMMENT '0:待拉取 1:已确认 2:失败',
|
|
|
+ create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
+ INDEX idx_tenant_status (tenant_id, status)
|
|
|
+);
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## 四、公共实体与枚举
|
|
|
+
|
|
|
+### 4.1 同步方向枚举
|
|
|
+```java
|
|
|
+@Getter
|
|
|
+@AllArgsConstructor
|
|
|
+public enum SyncDirection {
|
|
|
+ UP("本地→云端"),
|
|
|
+ DOWN("云端→本地"),
|
|
|
+ BOTH("双向");
|
|
|
+ private final String desc;
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 4.2 数据包
|
|
|
+```java
|
|
|
+@Data
|
|
|
+@Builder
|
|
|
+public class SyncPacket<T> {
|
|
|
+ private String tenantId;
|
|
|
+ private String tableName;
|
|
|
+ private SyncDirection direction;
|
|
|
+ private List<T> dataList;
|
|
|
+ private Long batchVersion;
|
|
|
+ private Boolean hasMore;
|
|
|
+ private String nonce;
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 4.3 同步响应
|
|
|
+```java
|
|
|
+@Data
|
|
|
+@Builder
|
|
|
+public class SyncResponse {
|
|
|
+ private boolean success;
|
|
|
+ private String message;
|
|
|
+ private Long confirmedVersion;
|
|
|
+ private Integer acceptedCount;
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 4.4 手动触发请求
|
|
|
+```java
|
|
|
+@Data
|
|
|
+public class SyncTriggerRequest {
|
|
|
+ private String tenantId;
|
|
|
+ private String tableName;
|
|
|
+ private SyncDirection direction;
|
|
|
+ private Boolean fullSync; // 是否全量同步(重置游标)
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## 五、云端服务实现
|
|
|
+
|
|
|
+### 5.1 云端接收控制器(本地→云端)
|
|
|
+```java
|
|
|
+@Slf4j
|
|
|
+@RestController
|
|
|
+@RequestMapping("/api/cloud/sync")
|
|
|
+@RequiredArgsConstructor
|
|
|
+public class CloudSyncReceiveController {
|
|
|
+
|
|
|
+ private final CloudSyncService cloudSyncService;
|
|
|
+ private final CloudAuthService cloudAuthService;
|
|
|
+
|
|
|
+ @PostMapping("/receive")
|
|
|
+ public SyncResponse receive(
|
|
|
+ @RequestHeader("X-App-Token") String token,
|
|
|
+ @RequestHeader("X-Sign") String sign,
|
|
|
+ @RequestBody @Valid SyncPacket packet) {
|
|
|
+
|
|
|
+ if (!cloudAuthService.validate(token, sign, packet)) {
|
|
|
+ return SyncResponse.builder().success(false).message("认证失败或请求已过期").build();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!cloudSyncService.validateTenant(packet.getTenantId())) {
|
|
|
+ return SyncResponse.builder().success(false).message("租户不存在或已停用").build();
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ int count = cloudSyncService.batchUpsert(packet);
|
|
|
+ return SyncResponse.builder()
|
|
|
+ .success(true)
|
|
|
+ .message("接收成功")
|
|
|
+ .acceptedCount(count)
|
|
|
+ .confirmedVersion(packet.getBatchVersion())
|
|
|
+ .build();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("云端接收数据失败, tenantId={}", packet.getTenantId(), e);
|
|
|
+ return SyncResponse.builder().success(false).message("写入失败:" + e.getMessage()).build();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 5.2 云端下发控制器(云端→本地)
|
|
|
+```java
|
|
|
+@Slf4j
|
|
|
+@RestController
|
|
|
+@RequestMapping("/api/cloud/sync")
|
|
|
+@RequiredArgsConstructor
|
|
|
+public class CloudSyncCommandController {
|
|
|
+
|
|
|
+ private final CloudSyncService cloudSyncService;
|
|
|
+ private final CloudAuthService cloudAuthService;
|
|
|
+
|
|
|
+ @GetMapping("/poll")
|
|
|
+ public SyncPacket poll(
|
|
|
+ @RequestHeader("X-App-Token") String token,
|
|
|
+ @RequestParam String tenantId,
|
|
|
+ @RequestParam String tableName,
|
|
|
+ @RequestParam(required = false) Long lastVersion) {
|
|
|
+
|
|
|
+ if (!cloudAuthService.validateToken(token)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return cloudSyncService.pollDownQueue(tenantId, tableName, lastVersion);
|
|
|
+ }
|
|
|
+
|
|
|
+ @PostMapping("/poll/ack")
|
|
|
+ public SyncResponse ackPoll(
|
|
|
+ @RequestHeader("X-App-Token") String token,
|
|
|
+ @RequestBody List<Long> queueIds) {
|
|
|
+
|
|
|
+ if (!cloudAuthService.validateToken(token)) {
|
|
|
+ return SyncResponse.builder().success(false).message("认证失败").build();
|
|
|
+ }
|
|
|
+ cloudSyncService.ackDownQueue(queueIds);
|
|
|
+ return SyncResponse.builder().success(true).build();
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 5.3 云端同步服务(冲突解决:云端优先)
|
|
|
+```java
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+@RequiredArgsConstructor
|
|
|
+public class CloudSyncService {
|
|
|
+
|
|
|
+ private final CloudDeviceDataMapper deviceDataMapper;
|
|
|
+ private final CloudSyncDownQueueMapper downQueueMapper;
|
|
|
+ private final TenantConfigMapper tenantConfigMapper;
|
|
|
+
|
|
|
+ public boolean validateTenant(String tenantId) {
|
|
|
+ TenantConfig config = tenantConfigMapper.selectByTenantId(tenantId);
|
|
|
+ return config != null && config.getStatus() == 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
+ public int batchUpsert(SyncPacket packet) {
|
|
|
+ List<CloudDeviceData> list = convertToCloudEntity(packet);
|
|
|
+ int count = 0;
|
|
|
+ for (CloudDeviceData data : list) {
|
|
|
+ CloudDeviceData exist = deviceDataMapper.selectByUk(data.getTenantId(), data.getDeviceCode());
|
|
|
+ if (exist == null) {
|
|
|
+ deviceDataMapper.insert(data);
|
|
|
+ count++;
|
|
|
+ } else if (data.getSyncVersion() >= exist.getSyncVersion()) {
|
|
|
+ data.setId(exist.getId());
|
|
|
+ deviceDataMapper.updateById(data);
|
|
|
+ count++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ }
|
|
|
+
|
|
|
+ public SyncPacket pollDownQueue(String tenantId, String tableName, Long lastVersion) {
|
|
|
+ if (lastVersion == null) lastVersion = 0L;
|
|
|
+ List<CloudSyncDownQueue> queueList = downQueueMapper.selectPending(
|
|
|
+ tenantId, tableName, lastVersion, 100);
|
|
|
+
|
|
|
+ if (queueList.isEmpty()) return null;
|
|
|
+
|
|
|
+ Long maxVersion = queueList.stream()
|
|
|
+ .mapToLong(CloudSyncDownQueue::getSyncVersion)
|
|
|
+ .max().orElse(lastVersion);
|
|
|
+
|
|
|
+ return SyncPacket.builder()
|
|
|
+ .tenantId(tenantId)
|
|
|
+ .tableName(tableName)
|
|
|
+ .direction(SyncDirection.DOWN)
|
|
|
+ .dataList(queueList)
|
|
|
+ .batchVersion(maxVersion)
|
|
|
+ .hasMore(queueList.size() >= 100)
|
|
|
+ .build();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void ackDownQueue(List<Long> queueIds) {
|
|
|
+ if (queueIds == null || queueIds.isEmpty()) return;
|
|
|
+ downQueueMapper.updateStatus(queueIds, 1);
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## 六、本地服务实现
|
|
|
+
|
|
|
+### 6.1 手动触发接口
|
|
|
+```java
|
|
|
+@RestController
|
|
|
+@RequestMapping("/api/local/sync")
|
|
|
+@RequiredArgsConstructor
|
|
|
+public class LocalSyncTriggerController {
|
|
|
+
|
|
|
+ private final LocalSyncAgent syncAgent;
|
|
|
+
|
|
|
+ @PostMapping("/trigger")
|
|
|
+ public String trigger(@RequestBody @Valid SyncTriggerRequest request) {
|
|
|
+ syncAgent.submitTask(request);
|
|
|
+ return "同步任务已提交: " + request.getDirection() +
|
|
|
+ ", tenant=" + request.getTenantId() +
|
|
|
+ ", table=" + request.getTableName();
|
|
|
+ }
|
|
|
+
|
|
|
+ @GetMapping("/status")
|
|
|
+ public String status() {
|
|
|
+ return syncAgent.getStatus();
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 6.2 本地Agent核心调度器
|
|
|
+```java
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+@RequiredArgsConstructor
|
|
|
+public class LocalSyncAgent {
|
|
|
+
|
|
|
+ private final LocalPushService pushService;
|
|
|
+ private final LocalPullService pullService;
|
|
|
+ private final ExecutorService executor = new ThreadPoolExecutor(
|
|
|
+ 4, 8, 60, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(100),
|
|
|
+ new ThreadFactoryBuilder().setNameFormat("sync-agent-%d").build(),
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy()
|
|
|
+ );
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ log.info("本地同步Agent启动");
|
|
|
+ executor.submit(this::pullLoop);
|
|
|
+ }
|
|
|
+
|
|
|
+ @PreDestroy
|
|
|
+ public void destroy() {
|
|
|
+ executor.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Scheduled(fixedDelay = 5000)
|
|
|
+ public void autoPush() {
|
|
|
+ try {
|
|
|
+ pushService.pushAllPending("device_data");
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("自动推送失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void pullLoop() {
|
|
|
+ while (!Thread.currentThread().isInterrupted()) {
|
|
|
+ try {
|
|
|
+ pullService.pullAndApply("device_data");
|
|
|
+ Thread.sleep(3000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ break;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("拉取云端数据失败", e);
|
|
|
+ try {
|
|
|
+ Thread.sleep(10000);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void submitTask(SyncTriggerRequest request) {
|
|
|
+ executor.submit(() -> {
|
|
|
+ try {
|
|
|
+ if (request.getDirection() == SyncDirection.UP || request.getDirection() == SyncDirection.BOTH) {
|
|
|
+ if (Boolean.TRUE.equals(request.getFullSync())) {
|
|
|
+ pushService.resetCursor(request.getTenantId(), "device_data");
|
|
|
+ }
|
|
|
+ pushService.pushByTenant(request.getTenantId(), "device_data");
|
|
|
+ }
|
|
|
+ if (request.getDirection() == SyncDirection.DOWN || request.getDirection() == SyncDirection.BOTH) {
|
|
|
+ pullService.pullByTenant(request.getTenantId(), "device_data");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("手动同步任务失败", e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getStatus() {
|
|
|
+ return "运行中, 活跃线程:" + ((ThreadPoolExecutor) executor).getActiveCount();
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 6.3 本地推送服务(本地→云端)
|
|
|
+```java
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+@RequiredArgsConstructor
|
|
|
+public class LocalPushService {
|
|
|
+
|
|
|
+ private final LocalDeviceDataMapper deviceDataMapper;
|
|
|
+ private final LocalSyncCursorMapper cursorMapper;
|
|
|
+ private final RestTemplate restTemplate;
|
|
|
+ private final ObjectMapper objectMapper;
|
|
|
+
|
|
|
+ @Value("${cloud.api.endpoint}")
|
|
|
+ private String cloudEndpoint;
|
|
|
+ @Value("${cloud.api.token}")
|
|
|
+ private String apiToken;
|
|
|
+
|
|
|
+ public void pushAllPending(String tableName) {
|
|
|
+ List<String> tenantIds = deviceDataMapper.selectPendingTenantIds(tableName);
|
|
|
+ for (String tenantId : tenantIds) {
|
|
|
+ try {
|
|
|
+ pushByTenant(tenantId, tableName);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("推送租户{}失败", tenantId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void pushByTenant(String tenantId, String tableName) {
|
|
|
+ LocalSyncCursor cursor = getOrInitCursor(tenantId, tableName, "UP");
|
|
|
+ Long lastVersion = cursor.getLastSyncVersion();
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ List<LocalDeviceData> dataList = deviceDataMapper.selectByVersionGt(
|
|
|
+ tenantId, lastVersion, 100);
|
|
|
+ if (dataList.isEmpty()) break;
|
|
|
+
|
|
|
+ Long batchVersion = dataList.stream()
|
|
|
+ .mapToLong(LocalDeviceData::getSyncVersion)
|
|
|
+ .max().orElse(lastVersion);
|
|
|
+
|
|
|
+ SyncPacket packet = SyncPacket.builder()
|
|
|
+ .tenantId(tenantId)
|
|
|
+ .tableName(tableName)
|
|
|
+ .direction(SyncDirection.UP)
|
|
|
+ .dataList(dataList)
|
|
|
+ .batchVersion(batchVersion)
|
|
|
+ .hasMore(dataList.size() >= 100)
|
|
|
+ .nonce(UUID.randomUUID().toString())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ SyncResponse response = pushToCloud(packet);
|
|
|
+ if (response != null && response.isSuccess()) {
|
|
|
+ lastVersion = response.getConfirmedVersion();
|
|
|
+ updateCursor(tenantId, tableName, "UP", lastVersion);
|
|
|
+ List<Long> ids = dataList.stream().map(LocalDeviceData::getId).toList();
|
|
|
+ deviceDataMapper.markSynced(ids);
|
|
|
+ if (Boolean.FALSE.equals(packet.getHasMore())) break;
|
|
|
+ } else {
|
|
|
+ log.error("云端接收失败: {}", response != null ? response.getMessage() : "无响应");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void resetCursor(String tenantId, String tableName) {
|
|
|
+ cursorMapper.updateLastVersion(tenantId, tableName, "UP", 0L);
|
|
|
+ }
|
|
|
+
|
|
|
+ private SyncResponse pushToCloud(SyncPacket packet) {
|
|
|
+ try {
|
|
|
+ HttpHeaders headers = new HttpHeaders();
|
|
|
+ headers.setContentType(MediaType.APPLICATION_JSON);
|
|
|
+ headers.set("X-App-Token", apiToken);
|
|
|
+ headers.set("X-Sign", HmacUtils.sign(objectMapper.writeValueAsString(packet), apiToken));
|
|
|
+
|
|
|
+ ResponseEntity<SyncResponse> response = restTemplate.exchange(
|
|
|
+ cloudEndpoint + "/api/cloud/sync/receive",
|
|
|
+ HttpMethod.POST,
|
|
|
+ new HttpEntity<>(packet, headers),
|
|
|
+ SyncResponse.class
|
|
|
+ );
|
|
|
+ return response.getBody();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("推送云端网络异常", e);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private LocalSyncCursor getOrInitCursor(String tenantId, String tableName, String direction) {
|
|
|
+ LocalSyncCursor cursor = cursorMapper.selectByUk(tenantId, tableName, direction);
|
|
|
+ if (cursor == null) {
|
|
|
+ cursor = new LocalSyncCursor();
|
|
|
+ cursor.setTenantId(tenantId);
|
|
|
+ cursor.setTableName(tableName);
|
|
|
+ cursor.setDirection(direction);
|
|
|
+ cursor.setLastSyncVersion(0L);
|
|
|
+ cursorMapper.insert(cursor);
|
|
|
+ }
|
|
|
+ return cursor;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateCursor(String tenantId, String tableName, String direction, Long version) {
|
|
|
+ cursorMapper.updateLastVersion(tenantId, tableName, direction, version);
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 6.4 本地拉取服务(云端→本地)
|
|
|
+```java
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+@RequiredArgsConstructor
|
|
|
+public class LocalPullService {
|
|
|
+
|
|
|
+ private final LocalDeviceDataMapper deviceDataMapper;
|
|
|
+ private final LocalSyncCursorMapper cursorMapper;
|
|
|
+ private final RestTemplate restTemplate;
|
|
|
+ private final ObjectMapper objectMapper;
|
|
|
+
|
|
|
+ @Value("${cloud.api.endpoint}")
|
|
|
+ private String cloudEndpoint;
|
|
|
+ @Value("${cloud.api.token}")
|
|
|
+ private String apiToken;
|
|
|
+
|
|
|
+ public void pullAndApply(String tableName) {
|
|
|
+ List<String> tenantIds = cursorMapper.selectAllTenantIds();
|
|
|
+ for (String tenantId : tenantIds) {
|
|
|
+ try {
|
|
|
+ pullByTenant(tenantId, tableName);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("拉取租户{}数据失败", tenantId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void pullByTenant(String tenantId, String tableName) {
|
|
|
+ LocalSyncCursor cursor = getOrInitCursor(tenantId, tableName, "DOWN");
|
|
|
+ Long lastVersion = cursor.getLastSyncVersion();
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ SyncPacket packet = pollCloud(tenantId, tableName, lastVersion);
|
|
|
+ if (packet == null || packet.getDataList() == null || packet.getDataList().isEmpty()) break;
|
|
|
+
|
|
|
+ List<CloudSyncDownQueue> queueList = (List<CloudSyncDownQueue>) packet.getDataList();
|
|
|
+ List<Long> ackIds = queueList.stream().map(CloudSyncDownQueue::getId).collect(Collectors.toList());
|
|
|
+
|
|
|
+ for (CloudSyncDownQueue queue : queueList) {
|
|
|
+ applyToLocal(queue);
|
|
|
+ }
|
|
|
+
|
|
|
+ lastVersion = packet.getBatchVersion();
|
|
|
+ updateCursor(tenantId, tableName, "DOWN", lastVersion);
|
|
|
+ ackCloud(ackIds);
|
|
|
+
|
|
|
+ if (Boolean.FALSE.equals(packet.getHasMore())) break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void applyToLocal(CloudSyncDownQueue queue) {
|
|
|
+ try {
|
|
|
+ LocalDeviceData data = objectMapper.readValue(queue.getPayload().toString(), LocalDeviceData.class);
|
|
|
+ data.setSyncVersion(queue.getSyncVersion());
|
|
|
+ data.setSyncDirection("CLOUD");
|
|
|
+ data.setSyncStatus(1);
|
|
|
+
|
|
|
+ LocalDeviceData exist = deviceDataMapper.selectByUk(data.getTenantId(), data.getDeviceCode());
|
|
|
+ if (exist == null) {
|
|
|
+ deviceDataMapper.insert(data);
|
|
|
+ } else if (queue.getSyncVersion() >= exist.getSyncVersion()) {
|
|
|
+ data.setId(exist.getId());
|
|
|
+ deviceDataMapper.updateById(data);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("应用云端数据到本地失败, queueId={}", queue.getId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private SyncPacket pollCloud(String tenantId, String tableName, Long lastVersion) {
|
|
|
+ try {
|
|
|
+ String url = UriComponentsBuilder.fromHttpUrl(cloudEndpoint + "/api/cloud/sync/poll")
|
|
|
+ .queryParam("tenantId", tenantId)
|
|
|
+ .queryParam("tableName", tableName)
|
|
|
+ .queryParam("lastVersion", lastVersion)
|
|
|
+ .toUriString();
|
|
|
+ return restTemplate.getForObject(url, SyncPacket.class);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("轮询云端失败", e);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void ackCloud(List<Long> queueIds) {
|
|
|
+ try {
|
|
|
+ restTemplate.postForObject(
|
|
|
+ cloudEndpoint + "/api/cloud/sync/poll/ack",
|
|
|
+ queueIds,
|
|
|
+ SyncResponse.class
|
|
|
+ );
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("ACK云端失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private LocalSyncCursor getOrInitCursor(String tenantId, String tableName, String direction) {
|
|
|
+ LocalSyncCursor cursor = cursorMapper.selectByUk(tenantId, tableName, direction);
|
|
|
+ if (cursor == null) {
|
|
|
+ cursor = new LocalSyncCursor();
|
|
|
+ cursor.setTenantId(tenantId);
|
|
|
+ cursor.setTableName(tableName);
|
|
|
+ cursor.setDirection(direction);
|
|
|
+ cursor.setLastSyncVersion(0L);
|
|
|
+ cursorMapper.insert(cursor);
|
|
|
+ }
|
|
|
+ return cursor;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateCursor(String tenantId, String tableName, String direction, Long version) {
|
|
|
+ cursorMapper.updateLastVersion(tenantId, tableName, direction, version);
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## 七、连通性测试接口
|
|
|
+
|
|
|
+### 7.1 测试目标转变
|
|
|
+| 原目标 | 新目标 |
|
|
|
+|--------|--------|
|
|
|
+| 本地能否连云端DB | 本地Agent能否正常推送/拉取 |
|
|
|
+| 云端租户配置是否存在 | 云端API返回的认证结果 |
|
|
|
+
|
|
|
+### 7.2 本地连通性测试接口
|
|
|
+```java
|
|
|
+@RestController
|
|
|
+@RequestMapping("/api/local/health")
|
|
|
+@RequiredArgsConstructor
|
|
|
+public class LocalHealthCheckController {
|
|
|
+
|
|
|
+ private final RestTemplate restTemplate;
|
|
|
+ @Value("${cloud.api.endpoint}")
|
|
|
+ private String cloudEndpoint;
|
|
|
+ @Value("${cloud.api.token}")
|
|
|
+ private String apiToken;
|
|
|
+
|
|
|
+ @GetMapping("/check")
|
|
|
+ public HealthCheckResponse check(@RequestParam String tenantId) {
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+
|
|
|
+ try {
|
|
|
+ String pingUrl = cloudEndpoint + "/api/cloud/sync/ping";
|
|
|
+ restTemplate.getForObject(pingUrl, String.class);
|
|
|
+
|
|
|
+ HttpHeaders headers = new HttpHeaders();
|
|
|
+ headers.set("X-App-Token", apiToken);
|
|
|
+ HttpEntity entity = new HttpEntity<>(headers);
|
|
|
+ ResponseEntity<String> authResp = restTemplate.exchange(
|
|
|
+ cloudEndpoint + "/api/cloud/sync/auth-check?tenantId=" + tenantId,
|
|
|
+ HttpMethod.GET, entity, String.class);
|
|
|
+
|
|
|
+ boolean tenantExists = "VALID".equals(authResp.getBody());
|
|
|
+
|
|
|
+ return HealthCheckResponse.builder()
|
|
|
+ .success(true)
|
|
|
+ .networkReachable(true)
|
|
|
+ .authValid(true)
|
|
|
+ .tenantExists(tenantExists)
|
|
|
+ .costTime(System.currentTimeMillis() - start)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ } catch (ResourceAccessException e) {
|
|
|
+ return HealthCheckResponse.builder()
|
|
|
+ .success(false)
|
|
|
+ .message("网络不可达:" + e.getMessage())
|
|
|
+ .build();
|
|
|
+ } catch (HttpClientErrorException.Unauthorized e) {
|
|
|
+ return HealthCheckResponse.builder()
|
|
|
+ .success(false)
|
|
|
+ .message("认证失败:Token无效")
|
|
|
+ .build();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 7.3 响应示例
|
|
|
+```json
|
|
|
+{
|
|
|
+ "success": true,
|
|
|
+ "networkReachable": true,
|
|
|
+ "authValid": true,
|
|
|
+ "tenantExists": true,
|
|
|
+ "costTime": 245
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## 八、配置说明
|
|
|
+
|
|
|
+### 8.1 本地配置(application.yml)
|
|
|
+```yaml
|
|
|
+server:
|
|
|
+ port: 9887
|
|
|
+
|
|
|
+cloud:
|
|
|
+ api:
|
|
|
+ endpoint: https://cloud.example.com
|
|
|
+ token: ${CLOUD_API_TOKEN} # 环境变量注入,非DB密码
|
|
|
+
|
|
|
+sync:
|
|
|
+ push:
|
|
|
+ batch-size: 100
|
|
|
+ fixed-delay-ms: 5000 # 准实时:每5秒扫描
|
|
|
+ pull:
|
|
|
+ interval-ms: 3000 # 长轮询间隔
|
|
|
+ fail-backoff-ms: 10000 # 失败退避
|
|
|
+```
|
|
|
+
|
|
|
+### 8.2 云端配置
|
|
|
+```yaml
|
|
|
+server:
|
|
|
+ port: 8080
|
|
|
+
|
|
|
+spring:
|
|
|
+ datasource:
|
|
|
+ url: jdbc:mysql://localhost:3306/cloud_db
|
|
|
+ username: cloud_root
|
|
|
+ password: ${DB_PASSWORD} # 仅云端持有
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## 九、API调用示例
|
|
|
+
|
|
|
+### 9.1 手动触发本地→云端
|
|
|
+```bash
|
|
|
+curl -X POST http://localhost:9887/api/local/sync/trigger -H "Content-Type: application/json" -d '{
|
|
|
+ "tenantId": "TENANT_001",
|
|
|
+ "tableName": "device_data",
|
|
|
+ "direction": "UP"
|
|
|
+ }'
|
|
|
+```
|
|
|
+
|
|
|
+### 9.2 手动触发双向全量同步
|
|
|
+```bash
|
|
|
+curl -X POST http://localhost:9887/api/local/sync/trigger -H "Content-Type: application/json" -d '{
|
|
|
+ "tenantId": "TENANT_001",
|
|
|
+ "tableName": "device_data",
|
|
|
+ "direction": "BOTH",
|
|
|
+ "fullSync": true
|
|
|
+ }'
|
|
|
+```
|
|
|
+
|
|
|
+### 9.3 查询Agent状态
|
|
|
+```bash
|
|
|
+curl http://localhost:9887/api/local/sync/status
|
|
|
+```
|
|
|
+
|
|
|
+### 9.4 连通性测试
|
|
|
+```bash
|
|
|
+curl "http://localhost:9887/api/local/health/check?tenantId=TENANT_001"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## 十、方案特性总结
|
|
|
+
|
|
|
+| 能力 | 实现方式 | 说明 |
|
|
|
+|------|---------|------|
|
|
|
+| **本地→云端** | PushService 定时扫描 + 批量推送 | 准实时(5秒级) |
|
|
|
+| **云端→本地** | PullService 长轮询 + 写入本地 | 准实时(3秒级) |
|
|
|
+| **手动触发** | REST接口提交任务到线程池 | 异步执行,不阻塞HTTP |
|
|
|
+| **自动实时** | 可接入Canal/Debezium替换轮询 | 当前用定时轮询兜底 |
|
|
|
+| **冲突解决** | sync_version 时间戳,云端优先 | 可配置为本地优先 |
|
|
|
+| **密码安全** | 本地只有API Token,无DB密码 | 云端Token可吊销、可限流 |
|
|
|
+| **断点续传** | sync_cursor 游标表 | 重启后从断点继续 |
|
|
|
+| **单向出站** | 所有连接由本地发起 | 云端零端口暴露 |
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## 十一、扩展建议
|
|
|
+
|
|
|
+1. **CDC实时接入**:使用Canal或Debezium监听本地Binlog,替换轮询扫描,实现毫秒级同步
|
|
|
+2. **消息队列缓冲**:云端引入Kafka/RabbitMQ,削峰填谷,避免突发流量压垮云端DB
|
|
|
+3. **数据压缩**:大数据量时启用Gzip压缩,减少带宽占用
|
|
|
+4. **监控告警**:对同步延迟、失败率、队列堆积进行监控,超过阈值触发告警
|
|
|
+
|