Browse Source

云平台代码提交

fuyuchuan 1 ngày trước cách đây
mục cha
commit
1dff4dd673
24 tập tin đã thay đổi với 339 bổ sung106 xóa
  1. 5 3
      service-issue/service-issue-biz/src/main/java/com/usky/issue/controller/api/CloudIntegrationApiController.java
  2. 3 2
      service-issue/service-issue-biz/src/main/java/com/usky/issue/controller/web/CloudSyncController.java
  3. 2 0
      service-issue/service-issue-biz/src/main/java/com/usky/issue/mapper/SysUserMapper.java
  4. 5 4
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/CloudSyncPublishService.java
  5. 1 1
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/CloudSyncReceiveService.java
  6. 25 25
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/CloudSyncService.java
  7. 4 2
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/CloudSyncTaskRunner.java
  8. 7 1
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/LocalPullService.java
  9. 1 1
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/LocalSyncAgent.java
  10. 33 10
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/client/CloudPlatformClient.java
  11. 8 0
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/constant/CloudIntegrationConstants.java
  12. 5 1
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/enums/SyncDirection.java
  13. 2 1
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/CloudConfigServiceImpl.java
  14. 39 19
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/CloudSyncPublishServiceImpl.java
  15. 5 2
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/CloudSyncReceiveServiceImpl.java
  16. 19 8
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/CloudSyncServiceImpl.java
  17. 27 12
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/CloudSyncTaskRunnerImpl.java
  18. 46 6
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/LocalPullServiceImpl.java
  19. 21 8
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/LocalSyncAgentImpl.java
  20. 46 0
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/support/CloudEntitySupport.java
  21. 1 0
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/vo/CloudConfigResponse.java
  22. 25 0
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/vo/CloudPollResult.java
  23. 6 0
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/vo/ManualSyncRequest.java
  24. 3 0
      service-issue/service-issue-biz/src/main/java/com/usky/issue/service/vo/SyncPacket.java

+ 5 - 3
service-issue/service-issue-biz/src/main/java/com/usky/issue/controller/api/CloudIntegrationApiController.java

@@ -135,12 +135,14 @@ public class CloudIntegrationApiController {
             @RequestHeader(value = CloudIntegrationConstants.HEADER_API_KEY, required = false) String apiKey,
             @RequestParam String tenantId,
             @RequestParam String tableName,
-            @RequestParam(required = false) Long lastVersion) {
+            @RequestParam(required = false) Long lastVersion,
+            @RequestParam(required = false, defaultValue = "false") boolean forceBackfill) {
 
         if (!cloudAuthService.validateToken(apiKey, Integer.valueOf(tenantId))) {
-            return ApiResult.success(null);
+            log.warn("[poll] 认证失败 tenantId={}, tableName={}", tenantId, tableName);
+            return ApiResult.error(CloudIntegrationConstants.MSG_POLL_AUTH_FAILED);
         }
-        return ApiResult.success(cloudSyncReceiveService.pollDownQueue(tenantId, tableName, lastVersion));
+        return ApiResult.success(cloudSyncReceiveService.pollDownQueue(tenantId, tableName, lastVersion, forceBackfill));
     }
 
     /**

+ 3 - 2
service-issue/service-issue-biz/src/main/java/com/usky/issue/controller/web/CloudSyncController.java

@@ -1,12 +1,12 @@
 package com.usky.issue.controller.web;
 
 import com.usky.common.core.bean.ApiResult;
-import com.usky.common.security.utils.SecurityUtils;
 import com.usky.issue.service.vo.ManualSyncRequest;
 import com.usky.issue.service.vo.SyncStatusResponse;
 import com.usky.issue.service.vo.SyncTaskResponse;
 import com.usky.issue.service.CloudSyncService;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
 
 import javax.validation.Valid;
@@ -19,6 +19,7 @@ import java.util.List;
  * @date 2026-05-21
  */
 @RestController
+@Validated
 @RequestMapping("/cloud")
 public class CloudSyncController {
 
@@ -32,7 +33,7 @@ public class CloudSyncController {
 
     @PostMapping("/sync")
     public ApiResult<SyncTaskResponse> manualSync(@RequestBody ManualSyncRequest request) {
-        return ApiResult.success(cloudSyncService.triggerManualSync(request.getType()));
+        return ApiResult.success(cloudSyncService.triggerManualSync(request.getType(), request.getDirection()));
     }
 
     @GetMapping("/sync/task/{taskId}")

+ 2 - 0
service-issue/service-issue-biz/src/main/java/com/usky/issue/mapper/SysUserMapper.java

@@ -1,5 +1,6 @@
 package com.usky.issue.mapper;
 
+import com.baomidou.dynamic.datasource.annotation.DS;
 import com.usky.common.mybatis.core.CrudMapper;
 import com.usky.issue.domain.SysUser;
 import org.springframework.stereotype.Repository;
@@ -10,6 +11,7 @@ import org.springframework.stereotype.Repository;
  * @author fyc
  * @date 2026-06-29
  */
+@DS("usky-cloud")
 @Repository
 public interface SysUserMapper extends CrudMapper<SysUser> {
 }

+ 5 - 4
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/CloudSyncPublishService.java

@@ -15,8 +15,9 @@ public interface CloudSyncPublishService {
      */
     void enqueueUser(SysUser user, String operation);
 
-    /**
-     * 首次拉取时从 sys_user 回填下发队列(仅当前租户)
-     */
-    void backfillUsersIfNeeded(Integer tenantId, Long lastVersion);
+    /** 首次拉取时从 sys_user 回填下发队列(仅当前租户) */
+    int backfillUsersIfNeeded(Integer tenantId, Long lastVersion, boolean force);
+
+    /** 统计租户在 sys_user 中的有效用户数 */
+    int countSysUsers(Integer tenantId);
 }

+ 1 - 1
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/CloudSyncReceiveService.java

@@ -17,7 +17,7 @@ public interface CloudSyncReceiveService {
 
     int batchUpsert(SyncPacket packet);
 
-    SyncPacket pollDownQueue(String tenantId, String tableName, Long lastVersion);
+    SyncPacket pollDownQueue(String tenantId, String tableName, Long lastVersion, boolean forceBackfill);
 
     void ackDownQueue(List<Long> queueIds);
 

+ 25 - 25
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/CloudSyncService.java

@@ -1,25 +1,25 @@
-package com.usky.issue.service;

-

-import com.usky.issue.service.vo.SyncStatusResponse;

-import com.usky.issue.service.vo.SyncTaskResponse;

-

-import java.util.List;

-

-/**

- * 云平台数据同步服务

- *

- * @author fyc

- * @date 2026-05-21

- */

-public interface CloudSyncService {

-

-    List<SyncStatusResponse> listSyncStatus();

-

-    SyncTaskResponse triggerManualSync(String syncType);

-

-    SyncTaskResponse getTaskProgress(Long taskId);

-

-    void onOrgChanged();

-

-    void onUserChanged();

-}

+package com.usky.issue.service;
+
+import com.usky.issue.service.vo.SyncStatusResponse;
+import com.usky.issue.service.vo.SyncTaskResponse;
+
+import java.util.List;
+
+/**
+ * 云平台数据同步服务
+ *
+ * @author fyc
+ * @date 2026-05-21
+ */
+public interface CloudSyncService {
+
+    List<SyncStatusResponse> listSyncStatus();
+
+    SyncTaskResponse triggerManualSync(String syncType, String direction);
+
+    SyncTaskResponse getTaskProgress(Long taskId);
+
+    void onOrgChanged();
+
+    void onUserChanged();
+}

+ 4 - 2
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/CloudSyncTaskRunner.java

@@ -1,5 +1,6 @@
 package com.usky.issue.service;
 
+import com.usky.issue.service.enums.SyncDirection;
 import com.usky.issue.service.enums.SyncTypeEnum;
 
 /**
@@ -10,7 +11,8 @@ import com.usky.issue.service.enums.SyncTypeEnum;
  */
 public interface CloudSyncTaskRunner {
 
-    com.usky.issue.domain.IssueSyncTask createTask(Long configId, SyncTypeEnum type, String triggerMode);
+    com.usky.issue.domain.IssueSyncTask createTask(Long configId, SyncTypeEnum type, String triggerMode,
+                                                   SyncDirection direction);
 
-    void executeSyncTask(Long taskId, SyncTypeEnum type);
+    void executeSyncTask(Long taskId, SyncTypeEnum type, SyncDirection direction);
 }

+ 7 - 1
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/LocalPullService.java

@@ -13,5 +13,11 @@ public interface LocalPullService {
 
     SyncResult pullAndApply(String tableName, IssueCloudConfig config);
 
-    SyncResult pullByTenant(Integer tenantId, String tableName, IssueCloudConfig config);
+    default SyncResult pullByTenant(Integer tenantId, String tableName, IssueCloudConfig config) {
+        return pullByTenant(tenantId, tableName, config, false);
+    }
+
+    SyncResult pullByTenant(Integer tenantId, String tableName, IssueCloudConfig config, boolean forceBackfill);
+
+    void resetDownCursor(Integer tenantId, String tableName);
 }

+ 1 - 1
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/LocalSyncAgent.java

@@ -15,5 +15,5 @@ public interface LocalSyncAgent {
 
     SyncResult executeSync(SyncTypeEnum syncType, IssueCloudConfig config, SyncDirection direction, boolean fullSync);
 
-    int countPending(SyncTypeEnum syncType, IssueCloudConfig config);
+    int countPending(SyncTypeEnum syncType, IssueCloudConfig config, SyncDirection direction);
 }

+ 33 - 10
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/client/CloudPlatformClient.java

@@ -4,10 +4,12 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.usky.issue.domain.IssueCloudConfig;
 import com.usky.issue.service.constant.CloudIntegrationConstants;
+import com.usky.issue.service.support.CloudEntitySupport;
 import com.usky.issue.service.util.AesGcmCipher;
 import com.usky.issue.service.util.CredentialMaskUtil;
 import com.usky.issue.service.util.HmacSignUtil;
 import com.usky.issue.service.vo.CloudConnectionTestResult;
+import com.usky.issue.service.vo.CloudPollResult;
 import com.usky.issue.service.vo.SyncPacket;
 import com.usky.issue.service.vo.SyncResponse;
 import lombok.extern.slf4j.Slf4j;
@@ -101,7 +103,8 @@ public class CloudPlatformClient {
         }
         try {
             String credential = aesGcmCipher.decrypt(config.getCredentialKey());
-            return testConnection(config.getCloudAddress(), config.getTenantId(), credential, config.getToken());
+            String token = CloudEntitySupport.resolveAccessToken(config);
+            return testConnection(config.getCloudAddress(), config.getTenantId(), credential, token);
         } catch (Exception ex) {
             log.error("凭证解密失败 tenantId={}", config.getTenantId(), ex);
             return CloudConnectionTestResult.failure(false, "凭证解密失败");
@@ -128,7 +131,8 @@ public class CloudPlatformClient {
         }
     }
 
-    public SyncPacket pollCloud(Integer tenantId, String tableName, Long lastVersion, IssueCloudConfig config) {
+    public CloudPollResult pollCloud(Integer tenantId, String tableName, Long lastVersion,
+                                     IssueCloudConfig config, boolean forceBackfill) {
         try {
             String credential = aesGcmCipher.decrypt(config.getCredentialKey());
             HttpHeaders headers = buildAuthHeaders(config, credential);
@@ -137,22 +141,41 @@ public class CloudPlatformClient {
                     .queryParam("tenantId", tenantId)
                     .queryParam("tableName", tableName)
                     .queryParam("lastVersion", lastVersion == null ? 0L : lastVersion)
+                    .queryParam("forceBackfill", forceBackfill)
                     .toUriString();
 
+            log.info("[pollCloud] 请求 url={}, tenantId={}, tableName={}, lastVersion={}, forceBackfill={}, hasToken={}",
+                    url, tenantId, tableName, lastVersion, forceBackfill,
+                    StringUtils.hasText(CloudEntitySupport.resolveAccessToken(config)));
+
             ResponseEntity<String> response = restTemplate.exchange(
                     url, HttpMethod.GET, new HttpEntity<>(headers), String.class);
-            if (!StringUtils.hasText(response.getBody())) {
-                return null;
+            String body = response.getBody();
+            if (!StringUtils.hasText(body)) {
+                return CloudPollResult.builder().requestFailed(true).errorMessage("云端返回空响应").build();
+            }
+            log.info("[pollCloud] 响应 status={}, bodyLength={}", response.getStatusCodeValue(), body.length());
+
+            JsonNode json = OBJECT_MAPPER.readTree(body);
+            int code = json.has("code") ? json.get("code").asInt(200) : 200;
+            String msg = json.has("msg") ? json.get("msg").asText() : null;
+            if (code != 200) {
+                boolean authFailed = msg != null && msg.contains("认证");
+                return CloudPollResult.builder()
+                        .authFailed(authFailed)
+                        .requestFailed(true)
+                        .errorMessage(StringUtils.hasText(msg) ? msg : "poll 失败 code=" + code)
+                        .build();
             }
-            JsonNode json = OBJECT_MAPPER.readTree(response.getBody());
             JsonNode dataNode = json.has("data") ? json.get("data") : json;
             if (dataNode == null || dataNode.isNull()) {
-                return null;
+                return CloudPollResult.builder().packet(null).build();
             }
-            return OBJECT_MAPPER.treeToValue(dataNode, SyncPacket.class);
+            SyncPacket packet = OBJECT_MAPPER.treeToValue(dataNode, SyncPacket.class);
+            return CloudPollResult.builder().packet(packet).build();
         } catch (Exception e) {
-            log.error("轮询云端失败", e);
-            return null;
+            log.error("[pollCloud] 轮询云端失败 tenantId={}, tableName={}", tenantId, tableName, e);
+            return CloudPollResult.builder().requestFailed(true).errorMessage(e.getMessage()).build();
         }
     }
 
@@ -176,7 +199,7 @@ public class CloudPlatformClient {
     }
 
     private HttpHeaders buildAuthHeaders(IssueCloudConfig config, String credential) {
-        return buildAuthHeaders(config.getTenantId(), credential, config.getToken());
+        return buildAuthHeaders(config.getTenantId(), credential, CloudEntitySupport.resolveAccessToken(config));
     }
 
     private HttpHeaders buildAuthHeaders(Integer tenantId, String credential, String token) {

+ 8 - 0
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/constant/CloudIntegrationConstants.java

@@ -43,5 +43,13 @@ public final class CloudIntegrationConstants {
     public static final String OPERATION_DISABLE = "DISABLE_CONFIG";
     public static final String OPERATION_ENABLE = "ENABLE_CONFIG";
     public static final String OPERATION_MANUAL_SYNC = "MANUAL_SYNC";
+
+    /** 云平台数据源(sys_user、issue_sync_* 等) */
+    public static final String DS_CLOUD = "usky-cloud";
+
+    public static final String MSG_NO_SYNC_DATA =
+            "未拉取到待同步数据:请确认云端 sys_user 存在该租户数据,且 poll 认证通过(type=USER 对应表 sys_user)";
+    public static final String MSG_POLL_AUTH_FAILED = "云端 poll 认证失败,请检查凭证密钥是否与云端配置一致";
+    public static final String PATH_SYNC_DIAGNOSE = "/sync/diagnose";
     public static final String OPERATION_SAVE_TRIGGER = "SAVE_TRIGGER";
 }

+ 5 - 1
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/enums/SyncDirection.java

@@ -21,8 +21,12 @@ public enum SyncDirection {
     private final String desc;
 
     public static SyncDirection fromCode(String code) {
+        if (code == null) {
+            throw new IllegalArgumentException("未知同步方向: null");
+        }
+        String normalized = code.trim();
         for (SyncDirection direction : values()) {
-            if (direction.code.equalsIgnoreCase(code)) {
+            if (direction.code.equalsIgnoreCase(normalized)) {
                 return direction;
             }
         }

+ 2 - 1
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/CloudConfigServiceImpl.java

@@ -146,7 +146,7 @@ public class CloudConfigServiceImpl implements CloudConfigService {
 
         CloudConnectionTestResult result = cloudPlatformClient.testConnection(
                 request.getCloudAddress(), request.getTenantId(), request.getCredentialKey(),
-                SecurityUtils.getToken());
+                CloudEntitySupport.resolveAccessToken());
 
         log.info("[连接测试] 远程结果 success={}, networkReachable={}, authValid={}, tenantExists={}, message={}, costTime={}ms",
                 result.isSuccess(), result.isNetworkReachable(), result.isAuthValid(),
@@ -364,6 +364,7 @@ public class CloudConfigServiceImpl implements CloudConfigService {
         resp.setUpdatedBy(config.getUpdatedBy());
         resp.setCreatedTime(config.getCreatedTime());
         resp.setCreatedBy(config.getCreatedBy());
+        resp.setCloudAddress(config.getCloudAddress());
         return resp;
     }
 }

+ 39 - 19
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/CloudSyncPublishServiceImpl.java

@@ -14,6 +14,7 @@ import com.usky.issue.service.support.CloudEntitySupport;
 import com.usky.issue.service.sync.SyncPayloadSupport;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.StringUtils;
@@ -27,6 +28,7 @@ import java.util.List;
  * @date 2026-06-29
  */
 @Slf4j
+@DS("usky-cloud")
 @Service
 public class CloudSyncPublishServiceImpl implements CloudSyncPublishService {
 
@@ -36,8 +38,10 @@ public class CloudSyncPublishServiceImpl implements CloudSyncPublishService {
     private IssueSyncDownQueueMapper downQueueMapper;
     @Autowired
     private SysUserMapper sysUserMapper;
+    @Lazy
+    @Autowired
+    private CloudSyncPublishService self;
 
-    @DS("usky-cloud")
     @Override
     @Transactional(rollbackFor = Exception.class)
     public void enqueueUser(SysUser user, String operation) {
@@ -77,43 +81,59 @@ public class CloudSyncPublishServiceImpl implements CloudSyncPublishService {
             queue.setStatus(0);
             CloudEntitySupport.fillOnInsert(queue);
             downQueueMapper.insert(queue);
-            log.info("用户已入下发队列 userId={}, tenantId={}, operation={}",
+            log.info("用户已从 sys_user 入下发队列 userId={}, tenantId={}, operation={}",
                     user.getUserId(), user.getTenantId(), op);
         } catch (Exception e) {
             throw new BusinessException("用户入下发队列失败: " + e.getMessage());
         }
     }
 
-    @DS("usky-cloud")
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public void backfillUsersIfNeeded(Integer tenantId, Long lastVersion) {
+    public int backfillUsersIfNeeded(Integer tenantId, Long lastVersion, boolean force) {
         if (tenantId == null) {
-            return;
+            return 0;
         }
-        if (lastVersion != null && lastVersion > 0) {
-            return;
+        if (!force && lastVersion != null && lastVersion > 0) {
+            return 0;
         }
         String tableName = SyncTypeEnum.USER.getCode();
-        Integer pending = downQueueMapper.selectCount(new LambdaQueryWrapper<IssueSyncDownQueue>()
-                .eq(IssueSyncDownQueue::getTenantId, tenantId)
-                .eq(IssueSyncDownQueue::getTableName, tableName)
-                .eq(IssueSyncDownQueue::getStatus, 0)
-                .eq(IssueSyncDownQueue::getIsDeleted, 0));
-        if (pending != null && pending > 0) {
-            return;
+        if (!force) {
+            Integer pending = downQueueMapper.selectCount(new LambdaQueryWrapper<IssueSyncDownQueue>()
+                    .eq(IssueSyncDownQueue::getTenantId, tenantId)
+                    .eq(IssueSyncDownQueue::getTableName, tableName)
+                    .eq(IssueSyncDownQueue::getStatus, 0)
+                    .eq(IssueSyncDownQueue::getIsDeleted, 0));
+            if (pending != null && pending > 0) {
+                log.info("租户{}下发队列已有{}条待同步,跳过 sys_user 回填", tenantId, pending);
+                return 0;
+            }
         }
 
         List<SysUser> users = sysUserMapper.selectList(new LambdaQueryWrapper<SysUser>()
                 .eq(SysUser::getTenantId, tenantId)
-                .eq(SysUser::getDelFlag, "0"));
+                .and(w -> w.eq(SysUser::getDelFlag, "0").or().isNull(SysUser::getDelFlag)));
         if (users.isEmpty()) {
-            log.debug("租户{}无用户数据可回填", tenantId);
-            return;
+            log.warn("租户{}在 sys_user 无有效用户(tenant_id={} 且 del_flag=0)", tenantId, tenantId);
+            return 0;
         }
-        log.info("开始回填用户下发队列 tenantId={}, count={}", tenantId, users.size());
+        log.info("从 sys_user 回填下发队列 tenantId={}, userCount={}, force={}", tenantId, users.size(), force);
+        int enqueued = 0;
         for (SysUser user : users) {
-            enqueueUser(user, "INSERT");
+            self.enqueueUser(user, "INSERT");
+            enqueued++;
         }
+        return enqueued;
+    }
+
+    @Override
+    public int countSysUsers(Integer tenantId) {
+        if (tenantId == null) {
+            return 0;
+        }
+        Integer count = sysUserMapper.selectCount(new LambdaQueryWrapper<SysUser>()
+                .eq(SysUser::getTenantId, tenantId)
+                .and(w -> w.eq(SysUser::getDelFlag, "0").or().isNull(SysUser::getDelFlag)));
+        return count == null ? 0 : count;
     }
 }

+ 5 - 2
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/CloudSyncReceiveServiceImpl.java

@@ -102,13 +102,15 @@ public class CloudSyncReceiveServiceImpl implements CloudSyncReceiveService {
 
     @DS("usky-cloud")
     @Override
-    public SyncPacket pollDownQueue(String tenantId, String tableName, Long lastVersion) {
+    public SyncPacket pollDownQueue(String tenantId, String tableName, Long lastVersion, boolean forceBackfill) {
         if (lastVersion == null) {
             lastVersion = 0L;
         }
         Integer tenant = Integer.valueOf(tenantId);
         if (SyncTypeEnum.USER.getCode().equalsIgnoreCase(tableName)) {
-            cloudSyncPublishService.backfillUsersIfNeeded(tenant, lastVersion);
+            int backfilled = cloudSyncPublishService.backfillUsersIfNeeded(tenant, lastVersion, forceBackfill);
+            log.info("[poll] USER 类型从 sys_user 回填 {} 条, tenantId={}, lastVersion={}, force={}",
+                    backfilled, tenantId, lastVersion, forceBackfill);
         }
         List<IssueSyncDownQueue> queueList = downQueueMapper.selectList(new LambdaQueryWrapper<IssueSyncDownQueue>()
                 .eq(IssueSyncDownQueue::getTenantId, tenant)
@@ -120,6 +122,7 @@ public class CloudSyncReceiveServiceImpl implements CloudSyncReceiveService {
                 .last("LIMIT " + POLL_BATCH_SIZE));
 
         if (queueList.isEmpty()) {
+            log.info("[poll] 下发队列为空 tenantId={}, tableName={}, lastVersion={}", tenantId, tableName, lastVersion);
             return null;
         }
 

+ 19 - 8
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/CloudSyncServiceImpl.java

@@ -12,6 +12,7 @@ import com.usky.issue.domain.IssueSyncTask;
 import com.usky.issue.domain.IssueTriggerEvent;
 import com.usky.issue.service.vo.SyncStatusResponse;
 import com.usky.issue.service.vo.SyncTaskResponse;
+import com.usky.issue.service.enums.SyncDirection;
 import com.usky.issue.service.enums.SyncTypeEnum;
 import com.usky.issue.service.enums.TriggerEventCodeEnum;
 import com.usky.issue.mapper.IssueSyncStatusMapper;
@@ -77,7 +78,7 @@ public class CloudSyncServiceImpl implements CloudSyncService {
     }
 
     @Override
-    public SyncTaskResponse triggerManualSync(String syncType) {
+    public SyncTaskResponse triggerManualSync(String syncType, String direction) {
         String operator = CloudEntitySupport.resolveOperator();
         String requestIp = CloudEntitySupport.resolveRequestIp();
         SyncTypeEnum typeEnum;
@@ -86,23 +87,33 @@ public class CloudSyncServiceImpl implements CloudSyncService {
         } catch (IllegalArgumentException ex) {
             throw new BusinessException("无效的同步类型: " + syncType);
         }
+        SyncDirection directionEnum;
+        try {
+            directionEnum = SyncDirection.fromCode(direction);
+        } catch (IllegalArgumentException ex) {
+            throw new BusinessException("无效的同步方向: " + direction);
+        }
+        if (directionEnum == SyncDirection.BOTH) {
+            throw new BusinessException("手动同步请指定 UP 或 DOWN 方向");
+        }
         IssueCloudConfig config = cloudConfigService.requireActiveConfig();
-        String lockKey = CloudIntegrationConstants.LOCK_SYNC_PREFIX + typeEnum.getCode();
+        String lockKey = CloudIntegrationConstants.LOCK_SYNC_PREFIX + typeEnum.getCode() + ":" + directionEnum.getCode();
         String lockValue = UUID.randomUUID().toString();
         boolean locked = redisHelper.lock(lockKey, lockValue, CloudIntegrationConstants.LOCK_SYNC_SECONDS);
         if (!locked) {
             throw new BusinessException("该类型同步正在进行中,请稍后再试");
         }
         try {
-            IssueSyncTask task = cloudSyncTaskRunner.createTask(config.getId(), typeEnum, "MANUAL");
+            IssueSyncTask task = cloudSyncTaskRunner.createTask(config.getId(), typeEnum, "MANUAL", directionEnum);
             operationLogService.log(config.getId(), CloudIntegrationConstants.OPERATION_MANUAL_SYNC,
-                    "手动同步 type=" + typeEnum.getCode(), operator, requestIp);
+                    "手动同步 type=" + typeEnum.getCode() + " direction=" + directionEnum.getCode(),
+                    operator, requestIp);
             redisHelper.delete(CloudIntegrationConstants.CACHE_STATUS_PREFIX + config.getId());
             if (typeEnum.isAsync()) {
-                cloudSyncExecutor.execute(() -> cloudSyncTaskRunner.executeSyncTask(task.getId(), typeEnum));
+                cloudSyncExecutor.execute(() -> cloudSyncTaskRunner.executeSyncTask(task.getId(), typeEnum, directionEnum));
                 return toTaskResponse(syncTaskMapper.selectById(task.getId()));
             }
-            cloudSyncTaskRunner.executeSyncTask(task.getId(), typeEnum);
+            cloudSyncTaskRunner.executeSyncTask(task.getId(), typeEnum, directionEnum);
             return toTaskResponse(syncTaskMapper.selectById(task.getId()));
         } finally {
             redisHelper.unlockLua(lockKey, lockValue);
@@ -143,8 +154,8 @@ public class CloudSyncServiceImpl implements CloudSyncService {
         if (event == null || event.getEnabled() == null || event.getEnabled() == 0) {
             return;
         }
-        IssueSyncTask task = cloudSyncTaskRunner.createTask(config.getId(), syncType, "EVENT");
-        cloudSyncExecutor.execute(() -> cloudSyncTaskRunner.executeSyncTask(task.getId(), syncType));
+        IssueSyncTask task = cloudSyncTaskRunner.createTask(config.getId(), syncType, "EVENT", SyncDirection.BOTH);
+        cloudSyncExecutor.execute(() -> cloudSyncTaskRunner.executeSyncTask(task.getId(), syncType, SyncDirection.BOTH));
     }
 
     private List<SyncStatusResponse> loadStatusFromDb(Long configId) {

+ 27 - 12
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/CloudSyncTaskRunnerImpl.java

@@ -1,6 +1,5 @@
 package com.usky.issue.service.impl;
 
-import com.baomidou.dynamic.datasource.annotation.DS;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.usky.issue.domain.IssueSyncDetail;
 import com.usky.issue.domain.IssueSyncStatus;
@@ -31,7 +30,6 @@ import java.time.LocalDateTime;
  * @author fyc
  * @date 2026-05-21
  */
-@DS("usky-cloud")
 @Service
 public class CloudSyncTaskRunnerImpl implements CloudSyncTaskRunner {
 
@@ -50,9 +48,9 @@ public class CloudSyncTaskRunnerImpl implements CloudSyncTaskRunner {
 
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public IssueSyncTask createTask(Long configId, SyncTypeEnum type, String triggerMode) {
+    public IssueSyncTask createTask(Long configId, SyncTypeEnum type, String triggerMode, SyncDirection direction) {
         IssueCloudConfig config = configMapper.selectById(configId);
-        int pending = localSyncAgent.countPending(type, config);
+        int pending = localSyncAgent.countPending(type, config, direction);
         IssueSyncTask task = new IssueSyncTask();
         task.setConfigId(configId);
         if (config != null) {
@@ -72,7 +70,7 @@ public class CloudSyncTaskRunnerImpl implements CloudSyncTaskRunner {
 
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public void executeSyncTask(Long taskId, SyncTypeEnum type) {
+    public void executeSyncTask(Long taskId, SyncTypeEnum type, SyncDirection direction) {
         IssueSyncTask task = syncTaskMapper.selectById(taskId);
         if (task == null) {
             return;
@@ -86,24 +84,39 @@ public class CloudSyncTaskRunnerImpl implements CloudSyncTaskRunner {
         syncTaskMapper.updateById(task);
 
         IssueCloudConfig config = configMapper.selectById(task.getConfigId());
-        SyncResult syncResult = localSyncAgent.executeSync(type, config, SyncDirection.BOTH, false);
+        boolean fullSync = "MANUAL".equals(task.getTriggerMode()) && direction == SyncDirection.DOWN;
+        SyncResult syncResult = localSyncAgent.executeSync(type, config, direction, fullSync);
 
         IssueSyncTask current = syncTaskMapper.selectById(taskId);
         if (current != null && TaskStatusEnum.CANCELLED.getCode().equals(current.getTaskStatus())) {
             return;
         }
 
-        task.setTotalCount(Math.max(task.getTotalCount(), syncResult.getTotalCount()));
+        int total = Math.max(task.getTotalCount() == null ? 0 : task.getTotalCount(), syncResult.getTotalCount());
+        task.setTotalCount(total);
         task.setProcessedCount(syncResult.getTotalCount());
         task.setSuccessCount(syncResult.getSuccessCount());
         task.setFailureCount(syncResult.getFailureCount());
-        task.setTaskStatus(syncResult.getFailureCount() > 0 && syncResult.getSuccessCount() == 0
-                ? TaskStatusEnum.FAILED.getCode() : TaskStatusEnum.SUCCESS.getCode());
-        if (syncResult.getFailureCount() > 0) {
+
+        if (syncResult.getFailureCount() > 0 && syncResult.getSuccessCount() == 0) {
+            task.setTaskStatus(TaskStatusEnum.FAILED.getCode());
             task.setErrorSummary(syncResult.getErrorMessage() != null
                     ? syncResult.getErrorMessage()
-                    : "部分失败: " + syncResult.getFailureCount() + " 条");
+                    : "同步失败: " + syncResult.getFailureCount() + " 条");
+        } else if (syncResult.getTotalCount() == 0) {
+            task.setTaskStatus(TaskStatusEnum.SUCCESS.getCode());
+            task.setErrorSummary(syncResult.getErrorMessage() != null
+                    ? syncResult.getErrorMessage()
+                    : CloudIntegrationConstants.MSG_NO_SYNC_DATA);
+        } else {
+            task.setTaskStatus(TaskStatusEnum.SUCCESS.getCode());
+            if (syncResult.getFailureCount() > 0) {
+                task.setErrorSummary(syncResult.getErrorMessage() != null
+                        ? syncResult.getErrorMessage()
+                        : "部分失败: " + syncResult.getFailureCount() + " 条");
+            }
         }
+
         task.setFinishTime(LocalDateTime.now());
         CloudEntitySupport.fillOnUpdate(task);
         syncTaskMapper.updateById(task);
@@ -142,7 +155,9 @@ public class CloudSyncTaskRunnerImpl implements CloudSyncTaskRunner {
         status.setTotalCount(task.getTotalCount());
         status.setSuccessCount(task.getSuccessCount());
         status.setFailureCount(task.getFailureCount());
-        status.setUnsyncedCount(Math.max(0, task.getTotalCount() - task.getProcessedCount()));
+        status.setUnsyncedCount(Math.max(0,
+                (task.getTotalCount() == null ? 0 : task.getTotalCount())
+                        - (task.getProcessedCount() == null ? 0 : task.getProcessedCount())));
         status.setLastSyncTime(LocalDateTime.now());
         CloudEntitySupport.fillOnUpdate(status);
         syncStatusMapper.updateById(status);

+ 46 - 6
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/LocalPullServiceImpl.java

@@ -14,6 +14,9 @@ import com.usky.issue.service.client.CloudPlatformClient;
 import com.usky.issue.service.enums.SyncDirection;
 import com.usky.issue.service.support.CloudEntitySupport;
 import com.usky.issue.service.sync.SyncDataApplierRegistry;
+import com.usky.issue.service.sync.SyncPayloadSupport;
+import com.usky.issue.service.constant.CloudIntegrationConstants;
+import com.usky.issue.service.vo.CloudPollResult;
 import com.usky.issue.service.vo.SyncPacket;
 import com.usky.issue.service.vo.SyncResponse;
 import com.usky.issue.service.vo.SyncResult;
@@ -71,16 +74,39 @@ public class LocalPullServiceImpl implements LocalPullService {
     }
 
     @Override
-    public SyncResult pullByTenant(Integer tenantId, String tableName, IssueCloudConfig config) {
+    public SyncResult pullByTenant(Integer tenantId, String tableName, IssueCloudConfig config, boolean forceBackfill) {
         SyncResult result = SyncResult.empty();
         IssueSyncCursor cursor = getOrInitCursor(tenantId, tableName, SyncDirection.DOWN.getCode());
         Long lastVersion = cursor.getLastSyncVersion() == null ? 0L : cursor.getLastSyncVersion();
+        if (forceBackfill) {
+            lastVersion = 0L;
+            updateCursor(tenantId, tableName, SyncDirection.DOWN.getCode(), 0L);
+            log.info("手动全量拉取,已重置 DOWN 游标 tenantId={}, tableName={}", tenantId, tableName);
+        }
 
         while (true) {
-            SyncPacket packet = cloudPlatformClient.pollCloud(tenantId, tableName, lastVersion, config);
+            CloudPollResult pollResult = cloudPlatformClient.pollCloud(
+                    tenantId, tableName, lastVersion, config, forceBackfill);
+            if (pollResult.isAuthFailed()) {
+                result.setFailureCount(1);
+                result.setErrorMessage(CloudIntegrationConstants.MSG_POLL_AUTH_FAILED);
+                log.error("[pull] poll 认证失败 tenantId={}, tableName={}", tenantId, tableName);
+                break;
+            }
+            if (pollResult.isRequestFailed()) {
+                result.setFailureCount(1);
+                result.setErrorMessage(pollResult.getErrorMessage());
+                break;
+            }
+            SyncPacket packet = pollResult.getPacket();
             if (packet == null || packet.getDataList() == null || packet.getDataList().isEmpty()) {
+                if (result.getTotalCount() == 0) {
+                    log.warn("云端未返回待同步数据 tenantId={}, tableName={}(type=USER 对应 sys_user,数据经 issue_sync_down_queue 下发)",
+                            tenantId, tableName);
+                }
                 break;
             }
+            forceBackfill = false;
 
             List<IssueSyncDownQueue> queueList = convertQueueList(packet.getDataList());
             List<Long> ackIds = new ArrayList<>();
@@ -114,8 +140,15 @@ public class LocalPullServiceImpl implements LocalPullService {
     }
 
     private void applyToLocal(IssueSyncDownQueue queue) throws Exception {
-        IssueSyncData data = OBJECT_MAPPER.readValue(queue.getPayload(), IssueSyncData.class);
-        if (data.getDataKey() == null) {
+        String rawPayload = queue.getPayload();
+        IssueSyncData data;
+        try {
+            data = OBJECT_MAPPER.readValue(rawPayload, IssueSyncData.class);
+        } catch (Exception ex) {
+            data = new IssueSyncData();
+            data.setPayload(rawPayload);
+        }
+        if (!StringUtils.hasText(data.getDataKey())) {
             data.setDataKey(queue.getTableName() + "-" + queue.getId());
         }
         data.setTenantId(queue.getTenantId());
@@ -141,11 +174,13 @@ public class LocalPullServiceImpl implements LocalPullService {
     }
 
     private void applyToBusinessTable(IssueSyncDownQueue queue, IssueSyncData data) throws Exception {
-        if (!StringUtils.hasText(data.getPayload())) {
+        String rawPayload = StringUtils.hasText(data.getPayload()) ? data.getPayload() : queue.getPayload();
+        if (!StringUtils.hasText(rawPayload)) {
             return;
         }
+        String businessPayload = SyncPayloadSupport.resolveBusinessPayload(rawPayload);
         String operation = StringUtils.hasText(queue.getOperation()) ? queue.getOperation() : "UPDATE";
-        syncDataApplierRegistry.apply(queue.getTenantId(), queue.getTableName(), operation, data.getPayload());
+        syncDataApplierRegistry.apply(queue.getTenantId(), queue.getTableName(), operation, businessPayload);
     }
 
     private void ackCloud(List<Long> queueIds, IssueCloudConfig config) {
@@ -195,4 +230,9 @@ public class LocalPullServiceImpl implements LocalPullService {
                 .set(IssueSyncCursor::getLastSyncVersion, version)
                 .set(IssueSyncCursor::getLastSyncTime, LocalDateTime.now()));
     }
+
+    @Override
+    public void resetDownCursor(Integer tenantId, String tableName) {
+        updateCursor(tenantId, tableName, SyncDirection.DOWN.getCode(), 0L);
+    }
 }

+ 21 - 8
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/impl/LocalSyncAgentImpl.java

@@ -63,19 +63,32 @@ public class LocalSyncAgentImpl implements LocalSyncAgent {
             result.merge(pushService.pushByTenant(config.getTenantId(), tableName, config));
         }
         if (direction == SyncDirection.DOWN || direction == SyncDirection.BOTH) {
-            result.merge(pullService.pullByTenant(config.getTenantId(), tableName, config));
+            if (fullSync) {
+                pullService.resetDownCursor(config.getTenantId(), tableName);
+            }
+            result.merge(pullService.pullByTenant(config.getTenantId(), tableName, config, fullSync));
         }
         return result;
     }
 
     @Override
-    public int countPending(SyncTypeEnum syncType, IssueCloudConfig config) {
+    public int countPending(SyncTypeEnum syncType, IssueCloudConfig config, SyncDirection direction) {
+        if (config == null || config.getTenantId() == null) {
+            return 0;
+        }
         String tableName = syncType.getCode();
-        Integer upCount = syncDataMapper.selectCount(new LambdaQueryWrapper<IssueSyncData>()
-                .eq(IssueSyncData::getTenantId, config.getTenantId())
-                .eq(IssueSyncData::getTableName, tableName)
-                .in(IssueSyncData::getSyncStatus, 0, 2)
-                .eq(IssueSyncData::getIsDeleted, 0));
-        return cloudSyncReceiveService.countPendingDownQueue(config.getTenantId(), tableName);
+        int count = 0;
+        if (direction == SyncDirection.UP || direction == SyncDirection.BOTH) {
+            Integer upCount = syncDataMapper.selectCount(new LambdaQueryWrapper<IssueSyncData>()
+                    .eq(IssueSyncData::getTenantId, config.getTenantId())
+                    .eq(IssueSyncData::getTableName, tableName)
+                    .in(IssueSyncData::getSyncStatus, 0, 2)
+                    .eq(IssueSyncData::getIsDeleted, 0));
+            count += upCount == null ? 0 : upCount;
+        }
+        if (direction == SyncDirection.DOWN || direction == SyncDirection.BOTH) {
+            count += cloudSyncReceiveService.countPendingDownQueue(config.getTenantId(), tableName);
+        }
+        return count;
     }
 }

+ 46 - 0
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/support/CloudEntitySupport.java

@@ -3,6 +3,7 @@ package com.usky.issue.service.support;
 import com.baomidou.mybatisplus.core.toolkit.StringUtils;
 import com.usky.common.security.utils.SecurityUtils;
 import com.usky.issue.domain.CloudAuditEntity;
+import com.usky.issue.domain.IssueCloudConfig;
 import org.springframework.web.context.request.RequestContextHolder;
 import org.springframework.web.context.request.ServletRequestAttributes;
 
@@ -32,6 +33,51 @@ public final class CloudEntitySupport {
         return attrs.getRequest().getRemoteAddr();
     }
 
+    /**
+     * 解析当前请求的访问令牌,用于转发至云端 HTTP 调用(Authorization: Bearer)
+     */
+    public static String resolveAccessToken() {
+        String fromHeader = resolveAuthorizationHeader();
+        if (StringUtils.isNotBlank(fromHeader)) {
+            return stripBearerPrefix(fromHeader);
+        }
+        try {
+            String fromSecurity = SecurityUtils.getToken();
+            if (StringUtils.isNotBlank(fromSecurity)) {
+                return stripBearerPrefix(fromSecurity);
+            }
+        } catch (Exception ignored) {
+            // 非 Web 上下文或无登录态
+        }
+        return null;
+    }
+
+    public static String resolveAccessToken(IssueCloudConfig config) {
+        if (config != null && StringUtils.isNotBlank(config.getToken())) {
+            return stripBearerPrefix(config.getToken());
+        }
+        return resolveAccessToken();
+    }
+
+    private static String resolveAuthorizationHeader() {
+        ServletRequestAttributes attrs = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
+        if (attrs == null) {
+            return null;
+        }
+        return attrs.getRequest().getHeader("Authorization");
+    }
+
+    private static String stripBearerPrefix(String token) {
+        if (token == null) {
+            return null;
+        }
+        String trimmed = token.trim();
+        if (trimmed.regionMatches(true, 0, "Bearer ", 0, 7)) {
+            return trimmed.substring(7).trim();
+        }
+        return trimmed;
+    }
+
     public static void fillOnInsert(CloudAuditEntity entity) {
         LocalDateTime now = LocalDateTime.now();
         if (entity.getTenantId() == null) {

+ 1 - 0
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/vo/CloudConfigResponse.java

@@ -25,4 +25,5 @@ public class CloudConfigResponse {
     private String updatedBy;
     private String createdBy;
     private LocalDateTime createdTime;
+    private String cloudAddress;
 }

+ 25 - 0
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/vo/CloudPollResult.java

@@ -0,0 +1,25 @@
+package com.usky.issue.service.vo;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * 云端 poll 调用结果
+ *
+ * @author fyc
+ * @date 2026-06-29
+ */
+@Data
+@Builder
+public class CloudPollResult {
+
+    private SyncPacket packet;
+
+    /** 认证失败 */
+    private boolean authFailed;
+
+    /** 网络或解析异常 */
+    private boolean requestFailed;
+
+    private String errorMessage;
+}

+ 6 - 0
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/vo/ManualSyncRequest.java

@@ -3,6 +3,7 @@ package com.usky.issue.service.vo;
 import lombok.Data;
 
 import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
 
 /**
  * 手动同步请求
@@ -15,4 +16,9 @@ public class ManualSyncRequest {
 
     @NotBlank(message = "同步类型不能为空")
     private String type;
+
+    /** 同步方向:UP=本地→云端,DOWN=云端→本地 */
+    @NotBlank(message = "同步方向不能为空")
+    @Pattern(regexp = "UP|DOWN", flags = Pattern.Flag.CASE_INSENSITIVE, message = "同步方向仅支持 UP 或 DOWN")
+    private String direction;
 }

+ 3 - 0
service-issue/service-issue-biz/src/main/java/com/usky/issue/service/vo/SyncPacket.java

@@ -1,5 +1,6 @@
 package com.usky.issue.service.vo;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.usky.issue.service.enums.SyncDirection;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -18,6 +19,7 @@ import java.util.List;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class SyncPacket {
 
     private String tenantId;
@@ -27,4 +29,5 @@ public class SyncPacket {
     private Long batchVersion;
     private Boolean hasMore;
     private String nonce;
+
 }