|
|
@@ -1,796 +0,0 @@
|
|
|
-# 本地-云端双向数据同步与连通性测试方案
|
|
|
-
|
|
|
-## 一、方案概述
|
|
|
-
|
|
|
-### 1.1 背景与约束
|
|
|
-- **本地服务器A**:不可信区,不受管控,不能暴露云端数据库密码
|
|
|
-- **云服务器**:可信区,持有数据库连接凭证
|
|
|
-- **同步需求**:双向同步(本地↔云端),支持手动触发与自动同步
|
|
|
-- **连通性测试**:验证本地Agent与云端API通道的可用性
|
|
|
-
|
|
|
-### 1.2 核心设计原则
|
|
|
-| 原则 | 说明 |
|
|
|
-|------|------|
|
|
|
-| **单向出站** | 所有连接由本地主动发起(HTTPS出站),云端零暴露 |
|
|
|
-| **密码隔离** | 云端DB密码仅存在于云端服务,本地仅持有API Token |
|
|
|
-| **游标断点续传** | 基于`sync_version`游标,支持断网恢复后增量同步 |
|
|
|
-| **冲突解决** | 默认`last-write-wins`,云端优先;可配置为本地优先 |
|
|
|
-| **分层检测** | 连通性测试覆盖网络层、认证层、租户层 |
|
|
|
-
|
|
|
----
|
|
|
-
|
|
|
-## 二、架构设计
|
|
|
-
|
|
|
-```
|
|
|
-┌─────────────────────────────────────────────────────────────────┐
|
|
|
-│ 云服务器(可信区) │
|
|
|
-│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
|
|
|
-│ │ SyncReceive │ │ SyncCommand │ │ Cloud Data Service │ │
|
|
|
-│ │ Controller │ │ Controller │ │ (直连云端DB) │ │
|
|
|
-│ │ (接收推送) │ │ (下发指令/数据) │ │ │ │
|
|
|
-│ └──────┬─────────┘ └──────┬───────┘ └──────────────────────┘ │
|
|
|
-│ ▲ │ │
|
|
|
-│ │ │ │
|
|
|
-│ HTTPS│出站上推 │HTTPS出站长轮询拉取 │
|
|
|
-│ │ │ │
|
|
|
-│ ┌──────┴───────────────────┴──────────────────────────────────┐ │
|
|
|
-│ │ 本地服务器A(不可信区) │ │
|
|
|
-│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
|
|
|
-│ │ │ LocalPush │ │ LocalPull │ │ Local Data │ │ │
|
|
|
-│ │ │ Service │ │ Service │ │ Service │ │ │
|
|
|
-│ │ │ (读取本地变更) │ │ (拉云端指令) │ │ (操作本地DB) │ │ │
|
|
|
-│ │ └──────┬─────────┘ └──────┬───────┘ └──────┬───────┘ │ │
|
|
|
-│ │ │ │ │ │ │
|
|
|
-│ │ ┌──────┴───────────────────┴──────────────────┘ │ │
|
|
|
-│ │ │ Local Sync Agent (调度核心) │ │
|
|
|
-│ │ │ • Push线程:定时/实时扫描本地变更 → 推云端 │ │
|
|
|
-│ │ │ • Pull线程:长轮询云端指令队列 → 写入本地 │ │
|
|
|
-│ │ │ • 手动触发:REST接口接收任务 → 提交线程池 │ │
|
|
|
-│ │ └────────────────────────────────────────────────────────────┘ │
|
|
|
-│ └─────────────────────────────────────────────────────────────────┘
|
|
|
-```
|
|
|
-
|
|
|
----
|
|
|
-
|
|
|
-## 三、数据库表设计
|
|
|
-
|
|
|
-### 3.1 本地业务表(示例:device_data)
|
|
|
-```sql
|
|
|
-CREATE TABLE local_device_data (
|
|
|
- id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
|
|
- tenant_id VARCHAR(50) NOT NULL,
|
|
|
- device_code VARCHAR(50),
|
|
|
- sensor_value DECIMAL(10,2),
|
|
|
- -- 同步控制字段
|
|
|
- sync_version BIGINT COMMENT '数据版本时间戳(毫秒)',
|
|
|
- sync_status TINYINT DEFAULT 0 COMMENT '0:未同步 1:已推送 2:推送失败',
|
|
|
- sync_direction VARCHAR(10) DEFAULT 'LOCAL' COMMENT 'LOCAL/CLOUD 数据来源',
|
|
|
- update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
|
- create_time DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
|
-);
|
|
|
-```
|
|
|
-
|
|
|
-### 3.2 本地同步游标表
|
|
|
-```sql
|
|
|
-CREATE TABLE local_sync_cursor (
|
|
|
- id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
|
|
- tenant_id VARCHAR(50) NOT NULL,
|
|
|
- table_name VARCHAR(50) NOT NULL,
|
|
|
- direction VARCHAR(10) NOT NULL COMMENT 'UP/DOWN',
|
|
|
- last_sync_version BIGINT DEFAULT 0 COMMENT '最后同步的版本号',
|
|
|
- last_sync_time DATETIME,
|
|
|
- UNIQUE KEY uk_tenant_table_dir (tenant_id, table_name, direction)
|
|
|
-);
|
|
|
-```
|
|
|
-
|
|
|
-### 3.3 云端下发队列(云端→本地)
|
|
|
-```sql
|
|
|
-CREATE TABLE cloud_sync_down_queue (
|
|
|
- id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
|
|
- tenant_id VARCHAR(50) NOT NULL,
|
|
|
- table_name VARCHAR(50) NOT NULL,
|
|
|
- operation VARCHAR(20) COMMENT 'INSERT/UPDATE/DELETE',
|
|
|
- payload JSON NOT NULL COMMENT '完整数据JSON',
|
|
|
- sync_version BIGINT NOT NULL,
|
|
|
- status TINYINT DEFAULT 0 COMMENT '0:待拉取 1:已确认 2:失败',
|
|
|
- create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
- INDEX idx_tenant_status (tenant_id, status)
|
|
|
-);
|
|
|
-```
|
|
|
-
|
|
|
----
|
|
|
-
|
|
|
-## 四、公共实体与枚举
|
|
|
-
|
|
|
-### 4.1 同步方向枚举
|
|
|
-```java
|
|
|
-@Getter
|
|
|
-@AllArgsConstructor
|
|
|
-public enum SyncDirection {
|
|
|
- UP("本地→云端"),
|
|
|
- DOWN("云端→本地"),
|
|
|
- BOTH("双向");
|
|
|
- private final String desc;
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-### 4.2 数据包
|
|
|
-```java
|
|
|
-@Data
|
|
|
-@Builder
|
|
|
-public class SyncPacket<T> {
|
|
|
- private String tenantId;
|
|
|
- private String tableName;
|
|
|
- private SyncDirection direction;
|
|
|
- private List<T> dataList;
|
|
|
- private Long batchVersion;
|
|
|
- private Boolean hasMore;
|
|
|
- private String nonce;
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-### 4.3 同步响应
|
|
|
-```java
|
|
|
-@Data
|
|
|
-@Builder
|
|
|
-public class SyncResponse {
|
|
|
- private boolean success;
|
|
|
- private String message;
|
|
|
- private Long confirmedVersion;
|
|
|
- private Integer acceptedCount;
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-### 4.4 手动触发请求
|
|
|
-```java
|
|
|
-@Data
|
|
|
-public class SyncTriggerRequest {
|
|
|
- private String tenantId;
|
|
|
- private String tableName;
|
|
|
- private SyncDirection direction;
|
|
|
- private Boolean fullSync; // 是否全量同步(重置游标)
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
----
|
|
|
-
|
|
|
-## 五、云端服务实现
|
|
|
-
|
|
|
-### 5.1 云端接收控制器(本地→云端)
|
|
|
-```java
|
|
|
-@Slf4j
|
|
|
-@RestController
|
|
|
-@RequestMapping("/api/cloud/sync")
|
|
|
-@RequiredArgsConstructor
|
|
|
-public class CloudSyncReceiveController {
|
|
|
-
|
|
|
- private final CloudSyncService cloudSyncService;
|
|
|
- private final CloudAuthService cloudAuthService;
|
|
|
-
|
|
|
- @PostMapping("/receive")
|
|
|
- public SyncResponse receive(
|
|
|
- @RequestHeader("X-App-Token") String token,
|
|
|
- @RequestHeader("X-Sign") String sign,
|
|
|
- @RequestBody @Valid SyncPacket packet) {
|
|
|
-
|
|
|
- if (!cloudAuthService.validate(token, sign, packet)) {
|
|
|
- return SyncResponse.builder().success(false).message("认证失败或请求已过期").build();
|
|
|
- }
|
|
|
-
|
|
|
- if (!cloudSyncService.validateTenant(packet.getTenantId())) {
|
|
|
- return SyncResponse.builder().success(false).message("租户不存在或已停用").build();
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- int count = cloudSyncService.batchUpsert(packet);
|
|
|
- return SyncResponse.builder()
|
|
|
- .success(true)
|
|
|
- .message("接收成功")
|
|
|
- .acceptedCount(count)
|
|
|
- .confirmedVersion(packet.getBatchVersion())
|
|
|
- .build();
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("云端接收数据失败, tenantId={}", packet.getTenantId(), e);
|
|
|
- return SyncResponse.builder().success(false).message("写入失败:" + e.getMessage()).build();
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-### 5.2 云端下发控制器(云端→本地)
|
|
|
-```java
|
|
|
-@Slf4j
|
|
|
-@RestController
|
|
|
-@RequestMapping("/api/cloud/sync")
|
|
|
-@RequiredArgsConstructor
|
|
|
-public class CloudSyncCommandController {
|
|
|
-
|
|
|
- private final CloudSyncService cloudSyncService;
|
|
|
- private final CloudAuthService cloudAuthService;
|
|
|
-
|
|
|
- @GetMapping("/poll")
|
|
|
- public SyncPacket poll(
|
|
|
- @RequestHeader("X-App-Token") String token,
|
|
|
- @RequestParam String tenantId,
|
|
|
- @RequestParam String tableName,
|
|
|
- @RequestParam(required = false) Long lastVersion) {
|
|
|
-
|
|
|
- if (!cloudAuthService.validateToken(token)) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- return cloudSyncService.pollDownQueue(tenantId, tableName, lastVersion);
|
|
|
- }
|
|
|
-
|
|
|
- @PostMapping("/poll/ack")
|
|
|
- public SyncResponse ackPoll(
|
|
|
- @RequestHeader("X-App-Token") String token,
|
|
|
- @RequestBody List<Long> queueIds) {
|
|
|
-
|
|
|
- if (!cloudAuthService.validateToken(token)) {
|
|
|
- return SyncResponse.builder().success(false).message("认证失败").build();
|
|
|
- }
|
|
|
- cloudSyncService.ackDownQueue(queueIds);
|
|
|
- return SyncResponse.builder().success(true).build();
|
|
|
- }
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-### 5.3 云端同步服务(冲突解决:云端优先)
|
|
|
-```java
|
|
|
-@Slf4j
|
|
|
-@Service
|
|
|
-@RequiredArgsConstructor
|
|
|
-public class CloudSyncService {
|
|
|
-
|
|
|
- private final CloudDeviceDataMapper deviceDataMapper;
|
|
|
- private final CloudSyncDownQueueMapper downQueueMapper;
|
|
|
- private final TenantConfigMapper tenantConfigMapper;
|
|
|
-
|
|
|
- public boolean validateTenant(String tenantId) {
|
|
|
- TenantConfig config = tenantConfigMapper.selectByTenantId(tenantId);
|
|
|
- return config != null && config.getStatus() == 1;
|
|
|
- }
|
|
|
-
|
|
|
- @Transactional(rollbackFor = Exception.class)
|
|
|
- public int batchUpsert(SyncPacket packet) {
|
|
|
- List<CloudDeviceData> list = convertToCloudEntity(packet);
|
|
|
- int count = 0;
|
|
|
- for (CloudDeviceData data : list) {
|
|
|
- CloudDeviceData exist = deviceDataMapper.selectByUk(data.getTenantId(), data.getDeviceCode());
|
|
|
- if (exist == null) {
|
|
|
- deviceDataMapper.insert(data);
|
|
|
- count++;
|
|
|
- } else if (data.getSyncVersion() >= exist.getSyncVersion()) {
|
|
|
- data.setId(exist.getId());
|
|
|
- deviceDataMapper.updateById(data);
|
|
|
- count++;
|
|
|
- }
|
|
|
- }
|
|
|
- return count;
|
|
|
- }
|
|
|
-
|
|
|
- public SyncPacket pollDownQueue(String tenantId, String tableName, Long lastVersion) {
|
|
|
- if (lastVersion == null) lastVersion = 0L;
|
|
|
- List<CloudSyncDownQueue> queueList = downQueueMapper.selectPending(
|
|
|
- tenantId, tableName, lastVersion, 100);
|
|
|
-
|
|
|
- if (queueList.isEmpty()) return null;
|
|
|
-
|
|
|
- Long maxVersion = queueList.stream()
|
|
|
- .mapToLong(CloudSyncDownQueue::getSyncVersion)
|
|
|
- .max().orElse(lastVersion);
|
|
|
-
|
|
|
- return SyncPacket.builder()
|
|
|
- .tenantId(tenantId)
|
|
|
- .tableName(tableName)
|
|
|
- .direction(SyncDirection.DOWN)
|
|
|
- .dataList(queueList)
|
|
|
- .batchVersion(maxVersion)
|
|
|
- .hasMore(queueList.size() >= 100)
|
|
|
- .build();
|
|
|
- }
|
|
|
-
|
|
|
- public void ackDownQueue(List<Long> queueIds) {
|
|
|
- if (queueIds == null || queueIds.isEmpty()) return;
|
|
|
- downQueueMapper.updateStatus(queueIds, 1);
|
|
|
- }
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
----
|
|
|
-
|
|
|
-## 六、本地服务实现
|
|
|
-
|
|
|
-### 6.1 手动触发接口
|
|
|
-```java
|
|
|
-@RestController
|
|
|
-@RequestMapping("/api/local/sync")
|
|
|
-@RequiredArgsConstructor
|
|
|
-public class LocalSyncTriggerController {
|
|
|
-
|
|
|
- private final LocalSyncAgent syncAgent;
|
|
|
-
|
|
|
- @PostMapping("/trigger")
|
|
|
- public String trigger(@RequestBody @Valid SyncTriggerRequest request) {
|
|
|
- syncAgent.submitTask(request);
|
|
|
- return "同步任务已提交: " + request.getDirection() +
|
|
|
- ", tenant=" + request.getTenantId() +
|
|
|
- ", table=" + request.getTableName();
|
|
|
- }
|
|
|
-
|
|
|
- @GetMapping("/status")
|
|
|
- public String status() {
|
|
|
- return syncAgent.getStatus();
|
|
|
- }
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-### 6.2 本地Agent核心调度器
|
|
|
-```java
|
|
|
-@Slf4j
|
|
|
-@Component
|
|
|
-@RequiredArgsConstructor
|
|
|
-public class LocalSyncAgent {
|
|
|
-
|
|
|
- private final LocalPushService pushService;
|
|
|
- private final LocalPullService pullService;
|
|
|
- private final ExecutorService executor = new ThreadPoolExecutor(
|
|
|
- 4, 8, 60, TimeUnit.SECONDS,
|
|
|
- new LinkedBlockingQueue<>(100),
|
|
|
- new ThreadFactoryBuilder().setNameFormat("sync-agent-%d").build(),
|
|
|
- new ThreadPoolExecutor.CallerRunsPolicy()
|
|
|
- );
|
|
|
-
|
|
|
- @PostConstruct
|
|
|
- public void init() {
|
|
|
- log.info("本地同步Agent启动");
|
|
|
- executor.submit(this::pullLoop);
|
|
|
- }
|
|
|
-
|
|
|
- @PreDestroy
|
|
|
- public void destroy() {
|
|
|
- executor.shutdown();
|
|
|
- }
|
|
|
-
|
|
|
- @Scheduled(fixedDelay = 5000)
|
|
|
- public void autoPush() {
|
|
|
- try {
|
|
|
- pushService.pushAllPending("device_data");
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("自动推送失败", e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void pullLoop() {
|
|
|
- while (!Thread.currentThread().isInterrupted()) {
|
|
|
- try {
|
|
|
- pullService.pullAndApply("device_data");
|
|
|
- Thread.sleep(3000);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- break;
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("拉取云端数据失败", e);
|
|
|
- try {
|
|
|
- Thread.sleep(10000);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void submitTask(SyncTriggerRequest request) {
|
|
|
- executor.submit(() -> {
|
|
|
- try {
|
|
|
- if (request.getDirection() == SyncDirection.UP || request.getDirection() == SyncDirection.BOTH) {
|
|
|
- if (Boolean.TRUE.equals(request.getFullSync())) {
|
|
|
- pushService.resetCursor(request.getTenantId(), "device_data");
|
|
|
- }
|
|
|
- pushService.pushByTenant(request.getTenantId(), "device_data");
|
|
|
- }
|
|
|
- if (request.getDirection() == SyncDirection.DOWN || request.getDirection() == SyncDirection.BOTH) {
|
|
|
- pullService.pullByTenant(request.getTenantId(), "device_data");
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("手动同步任务失败", e);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- public String getStatus() {
|
|
|
- return "运行中, 活跃线程:" + ((ThreadPoolExecutor) executor).getActiveCount();
|
|
|
- }
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-### 6.3 本地推送服务(本地→云端)
|
|
|
-```java
|
|
|
-@Slf4j
|
|
|
-@Service
|
|
|
-@RequiredArgsConstructor
|
|
|
-public class LocalPushService {
|
|
|
-
|
|
|
- private final LocalDeviceDataMapper deviceDataMapper;
|
|
|
- private final LocalSyncCursorMapper cursorMapper;
|
|
|
- private final RestTemplate restTemplate;
|
|
|
- private final ObjectMapper objectMapper;
|
|
|
-
|
|
|
- @Value("${cloud.api.endpoint}")
|
|
|
- private String cloudEndpoint;
|
|
|
- @Value("${cloud.api.token}")
|
|
|
- private String apiToken;
|
|
|
-
|
|
|
- public void pushAllPending(String tableName) {
|
|
|
- List<String> tenantIds = deviceDataMapper.selectPendingTenantIds(tableName);
|
|
|
- for (String tenantId : tenantIds) {
|
|
|
- try {
|
|
|
- pushByTenant(tenantId, tableName);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("推送租户{}失败", tenantId, e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void pushByTenant(String tenantId, String tableName) {
|
|
|
- LocalSyncCursor cursor = getOrInitCursor(tenantId, tableName, "UP");
|
|
|
- Long lastVersion = cursor.getLastSyncVersion();
|
|
|
-
|
|
|
- while (true) {
|
|
|
- List<LocalDeviceData> dataList = deviceDataMapper.selectByVersionGt(
|
|
|
- tenantId, lastVersion, 100);
|
|
|
- if (dataList.isEmpty()) break;
|
|
|
-
|
|
|
- Long batchVersion = dataList.stream()
|
|
|
- .mapToLong(LocalDeviceData::getSyncVersion)
|
|
|
- .max().orElse(lastVersion);
|
|
|
-
|
|
|
- SyncPacket packet = SyncPacket.builder()
|
|
|
- .tenantId(tenantId)
|
|
|
- .tableName(tableName)
|
|
|
- .direction(SyncDirection.UP)
|
|
|
- .dataList(dataList)
|
|
|
- .batchVersion(batchVersion)
|
|
|
- .hasMore(dataList.size() >= 100)
|
|
|
- .nonce(UUID.randomUUID().toString())
|
|
|
- .build();
|
|
|
-
|
|
|
- SyncResponse response = pushToCloud(packet);
|
|
|
- if (response != null && response.isSuccess()) {
|
|
|
- lastVersion = response.getConfirmedVersion();
|
|
|
- updateCursor(tenantId, tableName, "UP", lastVersion);
|
|
|
- List<Long> ids = dataList.stream().map(LocalDeviceData::getId).toList();
|
|
|
- deviceDataMapper.markSynced(ids);
|
|
|
- if (Boolean.FALSE.equals(packet.getHasMore())) break;
|
|
|
- } else {
|
|
|
- log.error("云端接收失败: {}", response != null ? response.getMessage() : "无响应");
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void resetCursor(String tenantId, String tableName) {
|
|
|
- cursorMapper.updateLastVersion(tenantId, tableName, "UP", 0L);
|
|
|
- }
|
|
|
-
|
|
|
- private SyncResponse pushToCloud(SyncPacket packet) {
|
|
|
- try {
|
|
|
- HttpHeaders headers = new HttpHeaders();
|
|
|
- headers.setContentType(MediaType.APPLICATION_JSON);
|
|
|
- headers.set("X-App-Token", apiToken);
|
|
|
- headers.set("X-Sign", HmacUtils.sign(objectMapper.writeValueAsString(packet), apiToken));
|
|
|
-
|
|
|
- ResponseEntity<SyncResponse> response = restTemplate.exchange(
|
|
|
- cloudEndpoint + "/api/cloud/sync/receive",
|
|
|
- HttpMethod.POST,
|
|
|
- new HttpEntity<>(packet, headers),
|
|
|
- SyncResponse.class
|
|
|
- );
|
|
|
- return response.getBody();
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("推送云端网络异常", e);
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private LocalSyncCursor getOrInitCursor(String tenantId, String tableName, String direction) {
|
|
|
- LocalSyncCursor cursor = cursorMapper.selectByUk(tenantId, tableName, direction);
|
|
|
- if (cursor == null) {
|
|
|
- cursor = new LocalSyncCursor();
|
|
|
- cursor.setTenantId(tenantId);
|
|
|
- cursor.setTableName(tableName);
|
|
|
- cursor.setDirection(direction);
|
|
|
- cursor.setLastSyncVersion(0L);
|
|
|
- cursorMapper.insert(cursor);
|
|
|
- }
|
|
|
- return cursor;
|
|
|
- }
|
|
|
-
|
|
|
- private void updateCursor(String tenantId, String tableName, String direction, Long version) {
|
|
|
- cursorMapper.updateLastVersion(tenantId, tableName, direction, version);
|
|
|
- }
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-### 6.4 本地拉取服务(云端→本地)
|
|
|
-```java
|
|
|
-@Slf4j
|
|
|
-@Service
|
|
|
-@RequiredArgsConstructor
|
|
|
-public class LocalPullService {
|
|
|
-
|
|
|
- private final LocalDeviceDataMapper deviceDataMapper;
|
|
|
- private final LocalSyncCursorMapper cursorMapper;
|
|
|
- private final RestTemplate restTemplate;
|
|
|
- private final ObjectMapper objectMapper;
|
|
|
-
|
|
|
- @Value("${cloud.api.endpoint}")
|
|
|
- private String cloudEndpoint;
|
|
|
- @Value("${cloud.api.token}")
|
|
|
- private String apiToken;
|
|
|
-
|
|
|
- public void pullAndApply(String tableName) {
|
|
|
- List<String> tenantIds = cursorMapper.selectAllTenantIds();
|
|
|
- for (String tenantId : tenantIds) {
|
|
|
- try {
|
|
|
- pullByTenant(tenantId, tableName);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("拉取租户{}数据失败", tenantId, e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void pullByTenant(String tenantId, String tableName) {
|
|
|
- LocalSyncCursor cursor = getOrInitCursor(tenantId, tableName, "DOWN");
|
|
|
- Long lastVersion = cursor.getLastSyncVersion();
|
|
|
-
|
|
|
- while (true) {
|
|
|
- SyncPacket packet = pollCloud(tenantId, tableName, lastVersion);
|
|
|
- if (packet == null || packet.getDataList() == null || packet.getDataList().isEmpty()) break;
|
|
|
-
|
|
|
- List<CloudSyncDownQueue> queueList = (List<CloudSyncDownQueue>) packet.getDataList();
|
|
|
- List<Long> ackIds = queueList.stream().map(CloudSyncDownQueue::getId).collect(Collectors.toList());
|
|
|
-
|
|
|
- for (CloudSyncDownQueue queue : queueList) {
|
|
|
- applyToLocal(queue);
|
|
|
- }
|
|
|
-
|
|
|
- lastVersion = packet.getBatchVersion();
|
|
|
- updateCursor(tenantId, tableName, "DOWN", lastVersion);
|
|
|
- ackCloud(ackIds);
|
|
|
-
|
|
|
- if (Boolean.FALSE.equals(packet.getHasMore())) break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void applyToLocal(CloudSyncDownQueue queue) {
|
|
|
- try {
|
|
|
- LocalDeviceData data = objectMapper.readValue(queue.getPayload().toString(), LocalDeviceData.class);
|
|
|
- data.setSyncVersion(queue.getSyncVersion());
|
|
|
- data.setSyncDirection("CLOUD");
|
|
|
- data.setSyncStatus(1);
|
|
|
-
|
|
|
- LocalDeviceData exist = deviceDataMapper.selectByUk(data.getTenantId(), data.getDeviceCode());
|
|
|
- if (exist == null) {
|
|
|
- deviceDataMapper.insert(data);
|
|
|
- } else if (queue.getSyncVersion() >= exist.getSyncVersion()) {
|
|
|
- data.setId(exist.getId());
|
|
|
- deviceDataMapper.updateById(data);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("应用云端数据到本地失败, queueId={}", queue.getId(), e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private SyncPacket pollCloud(String tenantId, String tableName, Long lastVersion) {
|
|
|
- try {
|
|
|
- String url = UriComponentsBuilder.fromHttpUrl(cloudEndpoint + "/api/cloud/sync/poll")
|
|
|
- .queryParam("tenantId", tenantId)
|
|
|
- .queryParam("tableName", tableName)
|
|
|
- .queryParam("lastVersion", lastVersion)
|
|
|
- .toUriString();
|
|
|
- return restTemplate.getForObject(url, SyncPacket.class);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("轮询云端失败", e);
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void ackCloud(List<Long> queueIds) {
|
|
|
- try {
|
|
|
- restTemplate.postForObject(
|
|
|
- cloudEndpoint + "/api/cloud/sync/poll/ack",
|
|
|
- queueIds,
|
|
|
- SyncResponse.class
|
|
|
- );
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("ACK云端失败", e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private LocalSyncCursor getOrInitCursor(String tenantId, String tableName, String direction) {
|
|
|
- LocalSyncCursor cursor = cursorMapper.selectByUk(tenantId, tableName, direction);
|
|
|
- if (cursor == null) {
|
|
|
- cursor = new LocalSyncCursor();
|
|
|
- cursor.setTenantId(tenantId);
|
|
|
- cursor.setTableName(tableName);
|
|
|
- cursor.setDirection(direction);
|
|
|
- cursor.setLastSyncVersion(0L);
|
|
|
- cursorMapper.insert(cursor);
|
|
|
- }
|
|
|
- return cursor;
|
|
|
- }
|
|
|
-
|
|
|
- private void updateCursor(String tenantId, String tableName, String direction, Long version) {
|
|
|
- cursorMapper.updateLastVersion(tenantId, tableName, direction, version);
|
|
|
- }
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
----
|
|
|
-
|
|
|
-## 七、连通性测试接口
|
|
|
-
|
|
|
-### 7.1 测试目标转变
|
|
|
-| 原目标 | 新目标 |
|
|
|
-|--------|--------|
|
|
|
-| 本地能否连云端DB | 本地Agent能否正常推送/拉取 |
|
|
|
-| 云端租户配置是否存在 | 云端API返回的认证结果 |
|
|
|
-
|
|
|
-### 7.2 本地连通性测试接口
|
|
|
-```java
|
|
|
-@RestController
|
|
|
-@RequestMapping("/api/local/health")
|
|
|
-@RequiredArgsConstructor
|
|
|
-public class LocalHealthCheckController {
|
|
|
-
|
|
|
- private final RestTemplate restTemplate;
|
|
|
- @Value("${cloud.api.endpoint}")
|
|
|
- private String cloudEndpoint;
|
|
|
- @Value("${cloud.api.token}")
|
|
|
- private String apiToken;
|
|
|
-
|
|
|
- @GetMapping("/check")
|
|
|
- public HealthCheckResponse check(@RequestParam String tenantId) {
|
|
|
- long start = System.currentTimeMillis();
|
|
|
-
|
|
|
- try {
|
|
|
- String pingUrl = cloudEndpoint + "/api/cloud/sync/ping";
|
|
|
- restTemplate.getForObject(pingUrl, String.class);
|
|
|
-
|
|
|
- HttpHeaders headers = new HttpHeaders();
|
|
|
- headers.set("X-App-Token", apiToken);
|
|
|
- HttpEntity entity = new HttpEntity<>(headers);
|
|
|
- ResponseEntity<String> authResp = restTemplate.exchange(
|
|
|
- cloudEndpoint + "/api/cloud/sync/auth-check?tenantId=" + tenantId,
|
|
|
- HttpMethod.GET, entity, String.class);
|
|
|
-
|
|
|
- boolean tenantExists = "VALID".equals(authResp.getBody());
|
|
|
-
|
|
|
- return HealthCheckResponse.builder()
|
|
|
- .success(true)
|
|
|
- .networkReachable(true)
|
|
|
- .authValid(true)
|
|
|
- .tenantExists(tenantExists)
|
|
|
- .costTime(System.currentTimeMillis() - start)
|
|
|
- .build();
|
|
|
-
|
|
|
- } catch (ResourceAccessException e) {
|
|
|
- return HealthCheckResponse.builder()
|
|
|
- .success(false)
|
|
|
- .message("网络不可达:" + e.getMessage())
|
|
|
- .build();
|
|
|
- } catch (HttpClientErrorException.Unauthorized e) {
|
|
|
- return HealthCheckResponse.builder()
|
|
|
- .success(false)
|
|
|
- .message("认证失败:Token无效")
|
|
|
- .build();
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-### 7.3 响应示例
|
|
|
-```json
|
|
|
-{
|
|
|
- "success": true,
|
|
|
- "networkReachable": true,
|
|
|
- "authValid": true,
|
|
|
- "tenantExists": true,
|
|
|
- "costTime": 245
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
----
|
|
|
-
|
|
|
-## 八、配置说明
|
|
|
-
|
|
|
-### 8.1 本地配置(application.yml)
|
|
|
-```yaml
|
|
|
-server:
|
|
|
- port: 9887
|
|
|
-
|
|
|
-cloud:
|
|
|
- api:
|
|
|
- endpoint: https://cloud.example.com
|
|
|
- token: ${CLOUD_API_TOKEN} # 环境变量注入,非DB密码
|
|
|
-
|
|
|
-sync:
|
|
|
- push:
|
|
|
- batch-size: 100
|
|
|
- fixed-delay-ms: 5000 # 准实时:每5秒扫描
|
|
|
- pull:
|
|
|
- interval-ms: 3000 # 长轮询间隔
|
|
|
- fail-backoff-ms: 10000 # 失败退避
|
|
|
-```
|
|
|
-
|
|
|
-### 8.2 云端配置
|
|
|
-```yaml
|
|
|
-server:
|
|
|
- port: 8080
|
|
|
-
|
|
|
-spring:
|
|
|
- datasource:
|
|
|
- url: jdbc:mysql://localhost:3306/cloud_db
|
|
|
- username: cloud_root
|
|
|
- password: ${DB_PASSWORD} # 仅云端持有
|
|
|
-```
|
|
|
-
|
|
|
----
|
|
|
-
|
|
|
-## 九、API调用示例
|
|
|
-
|
|
|
-### 9.1 手动触发本地→云端
|
|
|
-```bash
|
|
|
-curl -X POST http://localhost:9887/api/local/sync/trigger -H "Content-Type: application/json" -d '{
|
|
|
- "tenantId": "TENANT_001",
|
|
|
- "tableName": "device_data",
|
|
|
- "direction": "UP"
|
|
|
- }'
|
|
|
-```
|
|
|
-
|
|
|
-### 9.2 手动触发双向全量同步
|
|
|
-```bash
|
|
|
-curl -X POST http://localhost:9887/api/local/sync/trigger -H "Content-Type: application/json" -d '{
|
|
|
- "tenantId": "TENANT_001",
|
|
|
- "tableName": "device_data",
|
|
|
- "direction": "BOTH",
|
|
|
- "fullSync": true
|
|
|
- }'
|
|
|
-```
|
|
|
-
|
|
|
-### 9.3 查询Agent状态
|
|
|
-```bash
|
|
|
-curl http://localhost:9887/api/local/sync/status
|
|
|
-```
|
|
|
-
|
|
|
-### 9.4 连通性测试
|
|
|
-```bash
|
|
|
-curl "http://localhost:9887/api/local/health/check?tenantId=TENANT_001"
|
|
|
-```
|
|
|
-
|
|
|
----
|
|
|
-
|
|
|
-## 十、方案特性总结
|
|
|
-
|
|
|
-| 能力 | 实现方式 | 说明 |
|
|
|
-|------|---------|------|
|
|
|
-| **本地→云端** | PushService 定时扫描 + 批量推送 | 准实时(5秒级) |
|
|
|
-| **云端→本地** | PullService 长轮询 + 写入本地 | 准实时(3秒级) |
|
|
|
-| **手动触发** | REST接口提交任务到线程池 | 异步执行,不阻塞HTTP |
|
|
|
-| **自动实时** | 可接入Canal/Debezium替换轮询 | 当前用定时轮询兜底 |
|
|
|
-| **冲突解决** | sync_version 时间戳,云端优先 | 可配置为本地优先 |
|
|
|
-| **密码安全** | 本地只有API Token,无DB密码 | 云端Token可吊销、可限流 |
|
|
|
-| **断点续传** | sync_cursor 游标表 | 重启后从断点继续 |
|
|
|
-| **单向出站** | 所有连接由本地发起 | 云端零端口暴露 |
|
|
|
-
|
|
|
----
|
|
|
-
|
|
|
-## 十一、扩展建议
|
|
|
-
|
|
|
-1. **CDC实时接入**:使用Canal或Debezium监听本地Binlog,替换轮询扫描,实现毫秒级同步
|
|
|
-2. **消息队列缓冲**:云端引入Kafka/RabbitMQ,削峰填谷,避免突发流量压垮云端DB
|
|
|
-3. **数据压缩**:大数据量时启用Gzip压缩,减少带宽占用
|
|
|
-4. **监控告警**:对同步延迟、失败率、队列堆积进行监控,超过阈值触发告警
|
|
|
-
|