Pārlūkot izejas kodu

增加视频推流服务

hanzhengyi 2 nedēļas atpakaļ
vecāks
revīzija
d95b61edf9
21 mainītis faili ar 2052 papildinājumiem un 60 dzēšanām
  1. 53 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/SipPushController.java
  2. 0 15
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/SipVideoController.java
  3. 31 2
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/Gb28181VideoProperties.java
  4. 4 2
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/enums/MqttTopics.java
  5. 69 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/SipRtspPushService.java
  6. 144 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/VideoDataTransferService.java
  7. 179 41
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/VideoStreamPushService.java
  8. 32 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/config/SipClientProperties.java
  9. 679 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/device/Gb28181DeviceSimulator.java
  10. 22 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/device/Gb28181StreamResult.java
  11. 65 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/ps/Gb28181PsMuxer.java
  12. 162 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/ps/H264AccessUnitBuilder.java
  13. 257 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/push/RtspGb28181RtpSender.java
  14. 90 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/push/RtspH264RtpSender.java
  15. 28 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/push/SipVideoPusherFactory.java
  16. 78 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/rtp/AnnexBParser.java
  17. 46 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/rtp/H264RtpPacketizer.java
  18. 35 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/rtp/PsRtpPacketizer.java
  19. 29 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/rtp/RtpPacketBuilder.java
  20. 10 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/sdp/SdpUtils.java
  21. 39 0
      service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/util/SipDigestAuth.java

+ 53 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/SipPushController.java

@@ -0,0 +1,53 @@
+package com.usky.cdi.controller;
+
+import com.usky.cdi.service.impl.SipRtspPushService;
+import lombok.RequiredArgsConstructor;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Map;
+
+/**
+ * GB28181 推流(海康摄像机模式):手动注册设备,传入 RTSP 后等平台点播。
+ */
+@RestController
+@RequestMapping("/api/sip")
+@RequiredArgsConstructor
+@ConditionalOnProperty(prefix = "sip.client", name = "enabled", havingValue = "true")
+public class SipPushController {
+
+    private final SipRtspPushService sipRtspPushService;
+
+    /** 手动注册设备到 GB28181 平台(deviceId 动态传入,20 位数字) */
+    @PostMapping("/device/register")
+    public Map<String, Object> registerDevice(@RequestParam String deviceId) {
+        return sipRtspPushService.registerDevice(deviceId);
+    }
+
+    /** 手动注销设备(REGISTER Expires=0) */
+    @PostMapping("/device/unregister")
+    public Map<String, Object> unregisterDevice() {
+        return sipRtspPushService.unregisterDevice();
+    }
+
+    /**
+     * 绑定 RTSP 视频源并等待平台 INVITE 推流(需先注册设备)。
+     * 调用后请在 GB28181 平台对设备发起「实时预览」。
+     */
+    @PostMapping("/push")
+    public Map<String, Object> pushRtsp(
+            @RequestParam String rtspUrl,
+            @RequestParam(required = false) Integer durationSeconds) {
+        return sipRtspPushService.pushRtspStream(rtspUrl, durationSeconds);
+    }
+
+    /** 设备注册状态(类似摄像机 GB28181 在线状态) */
+    @GetMapping("/device/status")
+    public Map<String, Object> deviceStatus() {
+        return sipRtspPushService.deviceStatus();
+    }
+}

+ 0 - 15
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/controller/SipVideoController.java

@@ -1,6 +1,5 @@
 package com.usky.cdi.controller;
 
-import com.usky.cdi.service.impl.VideoStreamPushService;
 import com.usky.cdi.service.sip.config.SipServerProperties;
 import com.usky.cdi.service.sip.model.VideoStreamSession;
 import com.usky.cdi.service.sip.service.VideoStreamManager;
@@ -22,7 +21,6 @@ public class SipVideoController {
 
     private final SipServerProperties properties;
     private final VideoStreamManager streamManager;
-    private final VideoStreamPushService videoStreamPushService;
 
     @GetMapping("/status")
     public Map<String, Object> status() {
@@ -63,19 +61,6 @@ public class SipVideoController {
         return result;
     }
 
-    /**
-     * 测试向本机 SIP 服务端推送 H264 视频流(模拟 GB28181 设备推流)
-     */
-    @PostMapping("/test/push")
-    public Map<String, Object> testPush(
-            @RequestParam(defaultValue = "127.0.0.1") String host,
-            @RequestParam(required = false) Integer port,
-            @RequestParam(defaultValue = "5") int durationSeconds,
-            @RequestParam(defaultValue = "34020000001320000001") String deviceId) {
-        int sipPort = port != null ? port : properties.getPort();
-        return videoStreamPushService.pushTestStream(host, sipPort, durationSeconds, deviceId);
-    }
-
     private Map<String, Object> toSessionMap(VideoStreamSession session) {
         Map<String, Object> map = new HashMap<>();
         map.put("sessionId", session.getSessionId());

+ 31 - 2
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/config/Gb28181VideoProperties.java

@@ -2,16 +2,45 @@ package com.usky.cdi.service.config;
 
 import lombok.Data;
 import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
 
 @Data
-@Component
 @ConfigurationProperties(prefix = "gb28181.video")
 public class Gb28181VideoProperties {
 
     private boolean enabled = false;
+
+    /** 市平台 MQTT */
     private Integer tenantId;
     private Long engineeringId;
     private String mqttUsername;
     private String mqttPassword;
+    private String mqttTopic = "base/videoDevice";
+    private String mqttStreamTopic = "iotInfo/videoStream";
+
+    /** 国标流媒体平台(WVP)SIP 参数 */
+    private String sipId = "34020000002000000001";
+    private String sipDomain = "3402000000";
+    private String sipIp = "114.80.201.142";
+    private int sipPort = 15060;
+    private int cascadePort = 15061;
+    private int rtpPortMin = 30100;
+    private int rtpPortMax = 30500;
+
+    /** WVP HTTP API */
+    private String apiBaseUrl = "http://114.80.201.142:18080";
+    private String apiUsername = "admin";
+    private String apiPassword = "admin";
+
+    /** 定时同步设备目录 cron */
+    private String syncCron = "0 0/30 * * * ?";
+
+    /** 同步时是否为在线通道拉取预览地址 */
+    private boolean fetchPlayUrl = false;
+
+    /** WVP 点播推流 + MQTT 上报播放地址 */
+    private boolean streamPushEnabled = false;
+    private String streamPushCron = "0 */5 * * * ?";
+    private boolean autoPushAllOnline = false;
+    private int maxConcurrentStreams = 10;
+    private String preferredProtocol = "flv";
 }

+ 4 - 2
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/enums/MqttTopics.java

@@ -18,7 +18,8 @@ public final class MqttTopics {
         ENGINEERING("base/engineering", "人防工程基础信息"),
         PROTECTIVE_UNIT("base/protectiveUnit", "防护单元基础信息"),
         FLOOR_PLANE("base/floorPlane", "楼层平面图信息"),
-        SENSOR_INFO("base/sensorInfo", "智能监管物联设施数据");
+        SENSOR_INFO("base/sensorInfo", "智能监管物联设施数据"),
+        VIDEO_DEVICE("base/videoDevice", "GB28181视频设备目录");
 
         private final String topic;
         private final String desc;
@@ -74,7 +75,8 @@ public final class MqttTopics {
         // 人员闯入情况
         PERSON_PRESENCE("iotInfo/personPresence", "人员闯入情况"),
         // 用电负荷
-        ELECTRICITY_LOAD("iotInfo/electricityLoad", "人防用电负荷");
+        ELECTRICITY_LOAD("iotInfo/electricityLoad", "人防用电负荷"),
+        VIDEO_STREAM("iotInfo/videoStream", "GB28181视频流地址");
 
         private final String topic;
         private final String desc;

+ 69 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/SipRtspPushService.java

@@ -0,0 +1,69 @@
+package com.usky.cdi.service.impl;
+
+import com.usky.cdi.service.sip.config.SipClientProperties;
+import com.usky.cdi.service.sip.device.Gb28181DeviceSimulator;
+import com.usky.cdi.service.sip.device.Gb28181StreamResult;
+import com.usky.common.core.exception.BusinessException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * GB28181 SIP 模拟摄像机推流:手动 REGISTER + RTSP → PS/RTP。
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+@ConditionalOnProperty(prefix = "sip.client", name = "enabled", havingValue = "true")
+public class SipRtspPushService {
+
+    private final SipClientProperties properties;
+    private final Gb28181DeviceSimulator deviceSimulator;
+
+    public Map<String, Object> pushRtspStream(String rtspUrl, Integer durationSeconds) {
+        if (!StringUtils.hasText(rtspUrl)) {
+            throw new BusinessException("rtspUrl 不能为空");
+        }
+        int duration = durationSeconds != null ? durationSeconds : properties.getDefaultDurationSeconds();
+        Gb28181StreamResult result = deviceSimulator.bindRtspAndWait(
+                rtspUrl, properties.getInviteWaitSeconds(), duration);
+        return toMap(result);
+    }
+
+    public Map<String, Object> registerDevice(String deviceId) {
+        try {
+            return deviceSimulator.register(deviceId);
+        } catch (IllegalArgumentException | IllegalStateException e) {
+            throw new BusinessException(e.getMessage());
+        }
+    }
+
+    public Map<String, Object> unregisterDevice() {
+        return deviceSimulator.unregister();
+    }
+
+    public Map<String, Object> deviceStatus() {
+        return deviceSimulator.status();
+    }
+
+    private Map<String, Object> toMap(Gb28181StreamResult result) {
+        Map<String, Object> map = new HashMap<>();
+        map.put("success", result.isSuccess());
+        map.put("rtspUrl", result.getRtspUrl());
+        map.put("deviceId", deviceSimulator.status().get("deviceId"));
+        map.put("serverHost", properties.getServerHost());
+        map.put("serverPort", properties.getServerPort());
+        map.put("callId", result.getCallId());
+        map.put("localRtpPort", result.getLocalRtpPort());
+        map.put("remoteRtpHost", result.getRemoteRtpHost());
+        map.put("remoteRtpPort", result.getRemoteRtpPort());
+        map.put("packetsSent", result.getPacketsSent());
+        map.put("errors", result.getErrors());
+        return map;
+    }
+}

+ 144 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/VideoDataTransferService.java

@@ -0,0 +1,144 @@
+package com.usky.cdi.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.usky.cdi.service.CdiDeliveryLogService;
+import com.usky.cdi.service.config.Gb28181VideoProperties;
+import com.usky.cdi.service.enums.MqttTopics;
+import com.usky.cdi.service.mqtt.MqttConnectionTool;
+import com.usky.cdi.service.util.Gb28181MqttCredentialResolver;
+import com.usky.cdi.service.util.Gb28181VideoPlatformClient;
+import com.usky.cdi.service.util.SnowflakeIdGenerator;
+import com.usky.cdi.service.vo.video.VideoChannelVO;
+import com.usky.cdi.service.vo.video.VideoDeviceItemVO;
+import com.usky.cdi.service.vo.video.VideoDeviceSyncPacketVO;
+import com.usky.common.core.exception.BusinessException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+
+import javax.annotation.PostConstruct;
+import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * GB28181 视频设备目录同步:WVP HTTP API 拉取 → MQTT 上报市平台。
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+@ConditionalOnProperty(prefix = "gb28181.video", name = "enabled", havingValue = "true")
+public class VideoDataTransferService {
+
+    private static final int DELIVERY_DATA_TYPE = 5;
+
+    @Value("${snowflake.worker-id:4}")
+    private long workerId;
+
+    @Value("${snowflake.data-center-id:4}")
+    private long dataCenterId;
+
+    private final Gb28181VideoProperties properties;
+    private final Gb28181VideoPlatformClient platformClient;
+    private final MqttConnectionTool mqttConnectionTool;
+    private final CdiDeliveryLogService cdiDeliveryLogService;
+    private final Gb28181MqttCredentialResolver mqttCredentialResolver;
+
+    private SnowflakeIdGenerator idGenerator;
+    private final SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+
+    @PostConstruct
+    public void init() {
+        this.idGenerator = new SnowflakeIdGenerator(workerId, dataCenterId);
+    }
+
+    public JSONObject checkConfiguration() {
+        return platformClient.checkConfiguration();
+    }
+
+    public void synchronizeVideoDevices() {
+        Gb28181MqttCredentialResolver.MqttCredentials credentials = mqttCredentialResolver.resolve();
+        long startTime = System.currentTimeMillis();
+        int total = 0;
+        int failure = 0;
+
+        log.info("开始同步 GB28181 视频设备目录: engineeringId={}", credentials.getEngineeringId());
+        List<VideoDeviceItemVO> devices = platformClient.queryAllDevicesWithChannels();
+
+        if (properties.isFetchPlayUrl()) {
+            enrichPlayUrls(devices);
+        }
+
+        for (VideoDeviceItemVO device : devices) {
+            if (device.getChannels() != null) {
+                total += device.getChannels().size();
+            } else {
+                total++;
+            }
+        }
+
+        String topic = resolveDeviceTopic();
+        VideoDeviceSyncPacketVO packet = buildDeviceSyncPacket(credentials.getEngineeringId(), devices);
+        String jsonMessage = JSON.toJSONString(packet);
+
+        try {
+            MqttConnectionTool.MqttGateway gateway = mqttConnectionTool.connectOrRefresh(
+                    credentials.getUsername(), credentials.getPassword());
+            gateway.sendToMqtt(topic, jsonMessage);
+            log.info("视频设备目录 MQTT 上报成功: topic={}, devices={}, channels={}",
+                    topic, devices.size(), total);
+        } catch (Exception e) {
+            failure = total;
+            log.error("视频设备目录 MQTT 上报失败: {}", e.getMessage(), e);
+            throw new BusinessException("视频设备目录 MQTT 上报失败: " + e.getMessage());
+        } finally {
+            long endTime = System.currentTimeMillis();
+            int success = total - failure;
+            cdiDeliveryLogService.saveLog(topic, MqttTopics.Base.VIDEO_DEVICE.getDesc(), DELIVERY_DATA_TYPE,
+                    credentials.getTenantId(), credentials.getEngineeringId(), LocalDateTime.now(),
+                    startTime, endTime, total, success, failure, 0, failure > 0 ? 0 : 1, "自动同步");
+        }
+    }
+
+    private void enrichPlayUrls(List<VideoDeviceItemVO> devices) {
+        for (VideoDeviceItemVO device : devices) {
+            if (device.getChannels() == null) {
+                continue;
+            }
+            for (VideoChannelVO channel : device.getChannels()) {
+                if (!"ON".equalsIgnoreCase(channel.getStatus())) {
+                    continue;
+                }
+                try {
+                    channel.setPlayUrl(platformClient.queryPlayUrl(device.getDeviceId(), channel.getChannelId()));
+                } catch (Exception e) {
+                    log.warn("获取预览地址失败 deviceId={} channelId={}: {}",
+                            device.getDeviceId(), channel.getChannelId(), e.getMessage());
+                }
+            }
+        }
+    }
+
+    private VideoDeviceSyncPacketVO buildDeviceSyncPacket(Long engineeringId, List<VideoDeviceItemVO> devices) {
+        VideoDeviceSyncPacketVO packet = new VideoDeviceSyncPacketVO();
+        packet.setDataPacketID(idGenerator.nextPacketId());
+        packet.setEngineeringID(engineeringId);
+        packet.setPublishTime(timeFormat.format(new Date()));
+        packet.setPlatformSipId(properties.getSipId());
+        packet.setPlatformSipDomain(properties.getSipDomain());
+        packet.setDevices(devices);
+        return packet;
+    }
+
+    private String resolveDeviceTopic() {
+        if (StringUtils.hasText(properties.getMqttTopic())) {
+            return properties.getMqttTopic();
+        }
+        return MqttTopics.Base.VIDEO_DEVICE.getTopic();
+    }
+}

+ 179 - 41
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/impl/VideoStreamPushService.java

@@ -1,71 +1,209 @@
 package com.usky.cdi.service.impl;
 
-import com.usky.cdi.service.sip.config.SipClientProperties;
-import com.usky.cdi.service.sip.device.Gb28181DeviceSimulator;
-import com.usky.cdi.service.sip.device.Gb28181StreamResult;
+import com.alibaba.fastjson.JSON;
+import com.usky.cdi.service.CdiDeliveryLogService;
+import com.usky.cdi.service.config.Gb28181VideoProperties;
+import com.usky.cdi.service.enums.MqttTopics;
+import com.usky.cdi.service.mqtt.MqttConnectionTool;
+import com.usky.cdi.service.util.Gb28181MqttCredentialResolver;
+import com.usky.cdi.service.util.Gb28181VideoPlatformClient;
+import com.usky.cdi.service.util.SnowflakeIdGenerator;
+import com.usky.cdi.service.vo.video.VideoChannelVO;
+import com.usky.cdi.service.vo.video.VideoDeviceItemVO;
+import com.usky.cdi.service.vo.video.VideoPlayInfoVO;
+import com.usky.cdi.service.vo.video.VideoStreamItemVO;
+import com.usky.cdi.service.vo.video.VideoStreamSyncPacketVO;
 import com.usky.common.core.exception.BusinessException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 
-import java.util.HashMap;
+import javax.annotation.PostConstruct;
+import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
+/**
+ * WVP 点播推流:调用平台 API 拉流,并通过 MQTT 上报播放地址。
+ */
 @Slf4j
 @Service
 @RequiredArgsConstructor
-@ConditionalOnProperty(prefix = "sip.client", name = "enabled", havingValue = "true")
+@ConditionalOnProperty(prefix = "gb28181.video", name = "enabled", havingValue = "true")
 public class VideoStreamPushService {
 
-    private final SipClientProperties properties;
-    private final Gb28181DeviceSimulator deviceSimulator;
+    private static final int DELIVERY_DATA_TYPE = 6;
 
-    /**
-     * 绑定 RTSP 视频源,等待平台 INVITE 后推流(海康 GB28181 摄像机模式)。
-     */
-    public Map<String, Object> pushRtspStream(String rtspUrl, Integer durationSeconds) {
-        if (!StringUtils.hasText(rtspUrl)) {
-            throw new BusinessException("rtspUrl 不能为空");
+    @Value("${snowflake.worker-id:5}")
+    private long workerId;
+
+    @Value("${snowflake.data-center-id:5}")
+    private long dataCenterId;
+
+    private final Gb28181VideoProperties properties;
+    private final Gb28181VideoPlatformClient platformClient;
+    private final MqttConnectionTool mqttConnectionTool;
+    private final CdiDeliveryLogService cdiDeliveryLogService;
+    private final Gb28181MqttCredentialResolver mqttCredentialResolver;
+
+    private final Map<String, VideoPlayInfoVO> activeSessions = new ConcurrentHashMap<>();
+    private SnowflakeIdGenerator idGenerator;
+    private final SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+
+    @PostConstruct
+    public void init() {
+        this.idGenerator = new SnowflakeIdGenerator(workerId, dataCenterId);
+    }
+
+    public VideoPlayInfoVO pushStream(String deviceId, String channelId) {
+        validateIds(deviceId, channelId);
+        VideoPlayInfoVO playInfo = platformClient.startPlay(deviceId, channelId);
+        activeSessions.put(sessionKey(deviceId, channelId), playInfo);
+        log.info("WVP 点播成功: deviceId={}, channelId={}, url={}",
+                deviceId, channelId, playInfo.getPreferredPlayUrl(properties.getPreferredProtocol()));
+        return playInfo;
+    }
+
+    public VideoStreamSyncPacketVO pushAllOnlineStreams() {
+        Gb28181MqttCredentialResolver.MqttCredentials credentials = mqttCredentialResolver.resolve();
+        long startTime = System.currentTimeMillis();
+        List<VideoStreamItemVO> streamItems = new ArrayList<>();
+        int total = 0;
+        int failure = 0;
+
+        List<VideoDeviceItemVO> devices = platformClient.queryAllDevicesWithChannels();
+        int max = Math.max(1, properties.getMaxConcurrentStreams());
+        int pushed = 0;
+
+        for (VideoDeviceItemVO device : devices) {
+            if (!"ON".equalsIgnoreCase(device.getStatus()) || device.getChannels() == null) {
+                continue;
+            }
+            for (VideoChannelVO channel : device.getChannels()) {
+                if (!"ON".equalsIgnoreCase(channel.getStatus()) || pushed >= max) {
+                    continue;
+                }
+                total++;
+                try {
+                    VideoPlayInfoVO playInfo = pushStream(device.getDeviceId(), channel.getChannelId());
+                    streamItems.add(toStreamItem(device, channel, playInfo));
+                    pushed++;
+                } catch (Exception e) {
+                    failure++;
+                    log.warn("点播失败 deviceId={} channelId={}: {}",
+                            device.getDeviceId(), channel.getChannelId(), e.getMessage());
+                    streamItems.add(toFailedStreamItem(device, channel, e.getMessage()));
+                }
+            }
+        }
+
+        VideoStreamSyncPacketVO packet = buildStreamSyncPacket(credentials.getEngineeringId(), streamItems);
+        publishStreamPacket(credentials, packet, startTime, total, failure);
+        return packet;
+    }
+
+    public void maintainActiveStreams() {
+        if (!properties.isStreamPushEnabled()) {
+            return;
         }
-        int duration = durationSeconds != null ? durationSeconds : properties.getDefaultDurationSeconds();
-        Gb28181StreamResult result = deviceSimulator.bindRtspAndWait(
-                rtspUrl, properties.getInviteWaitSeconds(), duration);
-        return toMap(result);
+        if (properties.isAutoPushAllOnline()) {
+            pushAllOnlineStreams();
+            return;
+        }
+        log.debug("当前 WVP 活跃推流会话数: {}", activeSessions.size());
+    }
+
+    public void stopStream(String deviceId, String channelId) {
+        validateIds(deviceId, channelId);
+        platformClient.stopPlay(deviceId, channelId);
+        VideoPlayInfoVO removed = activeSessions.remove(sessionKey(deviceId, channelId));
+        if (removed != null) {
+            removed.setStreamStatus("STOPPED");
+        }
+        log.info("已停止 WVP 点播: deviceId={}, channelId={}", deviceId, channelId);
     }
 
-    public Map<String, Object> registerDevice(String deviceId) {
+    public Map<String, VideoPlayInfoVO> getActiveSessions() {
+        return Collections.unmodifiableMap(new LinkedHashMap<>(activeSessions));
+    }
+
+    private void publishStreamPacket(Gb28181MqttCredentialResolver.MqttCredentials credentials,
+                                     VideoStreamSyncPacketVO packet, long startTime,
+                                     int total, int failure) {
+        String topic = resolveStreamTopic();
+        String jsonMessage = JSON.toJSONString(packet);
         try {
-            return deviceSimulator.register(deviceId);
-        } catch (IllegalArgumentException e) {
-            throw new BusinessException(e.getMessage());
-        } catch (IllegalStateException e) {
-            throw new BusinessException(e.getMessage());
+            MqttConnectionTool.MqttGateway gateway = mqttConnectionTool.connectOrRefresh(
+                    credentials.getUsername(), credentials.getPassword());
+            gateway.sendToMqtt(topic, jsonMessage);
+            log.info("视频流 MQTT 上报成功: topic={}, streams={}", topic, packet.getStreams().size());
+        } catch (Exception e) {
+            failure = total;
+            log.error("视频流 MQTT 上报失败: {}", e.getMessage(), e);
+            throw new BusinessException("视频流 MQTT 上报失败: " + e.getMessage());
+        } finally {
+            long endTime = System.currentTimeMillis();
+            int success = total - failure;
+            cdiDeliveryLogService.saveLog(topic, MqttTopics.IotInfo.VIDEO_STREAM.getDesc(), DELIVERY_DATA_TYPE,
+                    credentials.getTenantId(), credentials.getEngineeringId(), LocalDateTime.now(),
+                    startTime, endTime, total, success, failure, 0, failure > 0 ? 0 : 1, "自动同步");
         }
     }
 
-    public Map<String, Object> unregisterDevice() {
-        return deviceSimulator.unregister();
+    private VideoStreamSyncPacketVO buildStreamSyncPacket(Long engineeringId, List<VideoStreamItemVO> streams) {
+        VideoStreamSyncPacketVO packet = new VideoStreamSyncPacketVO();
+        packet.setDataPacketID(idGenerator.nextPacketId());
+        packet.setEngineeringID(engineeringId);
+        packet.setPublishTime(timeFormat.format(new Date()));
+        packet.setMediaServerIp(properties.getSipIp());
+        packet.setStreams(streams);
+        return packet;
+    }
+
+    private VideoStreamItemVO toStreamItem(VideoDeviceItemVO device, VideoChannelVO channel, VideoPlayInfoVO playInfo) {
+        VideoStreamItemVO item = new VideoStreamItemVO();
+        item.setDeviceId(device.getDeviceId());
+        item.setDeviceName(device.getDeviceName());
+        item.setChannelId(channel.getChannelId());
+        item.setChannelName(channel.getChannelName());
+        item.setStreamStatus(playInfo.getStreamStatus());
+        item.setPlayUrl(playInfo.getPreferredPlayUrl(properties.getPreferredProtocol()));
+        item.setFlvUrl(playInfo.getFlvUrl());
+        item.setHlsUrl(playInfo.getHlsUrl());
+        item.setRtspUrl(playInfo.getRtspUrl());
+        return item;
+    }
+
+    private VideoStreamItemVO toFailedStreamItem(VideoDeviceItemVO device, VideoChannelVO channel, String message) {
+        VideoStreamItemVO item = toStreamItem(device, channel, new VideoPlayInfoVO());
+        item.setStreamStatus("FAILED");
+        item.setPlayUrl(message);
+        return item;
     }
 
-    public Map<String, Object> deviceStatus() {
-        return deviceSimulator.status();
+    private static void validateIds(String deviceId, String channelId) {
+        if (!StringUtils.hasText(deviceId) || !StringUtils.hasText(channelId)) {
+            throw new BusinessException("deviceId 与 channelId 不能为空");
+        }
     }
 
-    private Map<String, Object> toMap(Gb28181StreamResult result) {
-        Map<String, Object> map = new HashMap<>();
-        map.put("success", result.isSuccess());
-        map.put("rtspUrl", result.getRtspUrl());
-        map.put("deviceId", deviceSimulator.status().get("deviceId"));
-        map.put("serverHost", properties.getServerHost());
-        map.put("serverPort", properties.getServerPort());
-        map.put("callId", result.getCallId());
-        map.put("localRtpPort", result.getLocalRtpPort());
-        map.put("remoteRtpHost", result.getRemoteRtpHost());
-        map.put("remoteRtpPort", result.getRemoteRtpPort());
-        map.put("packetsSent", result.getPacketsSent());
-        map.put("errors", result.getErrors());
-        return map;
+    private static String sessionKey(String deviceId, String channelId) {
+        return deviceId + "_" + channelId;
+    }
+
+    private String resolveStreamTopic() {
+        if (StringUtils.hasText(properties.getMqttStreamTopic())) {
+            return properties.getMqttStreamTopic();
+        }
+        return MqttTopics.IotInfo.VIDEO_STREAM.getTopic();
     }
 }

+ 32 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/config/SipClientProperties.java

@@ -0,0 +1,32 @@
+package com.usky.cdi.service.sip.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * GB28181 配置(与海康摄像机 Web 配置项对应)
+ */
+@Data
+@Component
+@ConfigurationProperties(prefix = "sip.client")
+public class SipClientProperties {
+
+    private boolean enabled = true;
+    /** 启动时自动注册(默认关闭,改用手动 API) */
+    private boolean autoStart = false;
+
+    private String serverHost = "114.80.201.142";
+    private int serverPort = 15060;
+    private String domain = "3402000000";
+    private String platformId = "34020000002000000001";
+    /** 仅作兼容保留,实际设备 ID 由注册接口动态传入 */
+    private String deviceId;
+    private String password = "jkjj_wlgz";
+    private String sipTransport = "tcp";
+    private int registerExpires = 3600;
+
+    private int inviteWaitSeconds = 300;
+    private int defaultDurationSeconds = 60;
+    private String rtspTransport = "tcp";
+}

+ 679 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/device/Gb28181DeviceSimulator.java

@@ -0,0 +1,679 @@
+package com.usky.cdi.service.sip.device;
+
+import com.usky.cdi.service.sip.config.SipClientProperties;
+import com.usky.cdi.service.sip.model.SdpMediaInfo;
+import com.usky.cdi.service.sip.push.RtspGb28181RtpSender;
+import com.usky.cdi.service.sip.sdp.SdpUtils;
+import com.usky.cdi.service.sip.util.NetworkUtils;
+import com.usky.cdi.service.sip.util.SipDigestAuth;
+import gov.nist.javax.sip.RequestEventExt;
+import gov.nist.javax.sip.SipStackImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+
+import javax.annotation.PreDestroy;
+import javax.sip.*;
+import javax.sip.address.Address;
+import javax.sip.address.AddressFactory;
+import javax.sip.address.SipURI;
+import javax.sip.header.*;
+import javax.sip.message.MessageFactory;
+import javax.sip.message.Request;
+import javax.sip.message.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * 模拟海康摄像机 GB28181 接入:手动 REGISTER,平台点播 INVITE 后从 RTSP 推流。
+ */
+@Slf4j
+@Service
+@ConditionalOnProperty(prefix = "sip.client", name = "enabled", havingValue = "true")
+public class Gb28181DeviceSimulator implements SipListener {
+
+    private static final String GB28181_SSRC = "0100000001";
+
+    private final SipClientProperties properties;
+    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "gb28181-device"));
+
+    private SipStack sipStack;
+    private SipProvider sipProvider;
+    private AddressFactory addressFactory;
+    private MessageFactory messageFactory;
+    private HeaderFactory headerFactory;
+
+    private String sipTransport;
+    private String localIp;
+    private int localSipPort;
+    private int cseq = 1;
+
+    private final AtomicBoolean registered = new AtomicBoolean(false);
+    private final AtomicBoolean registerAuthSent = new AtomicBoolean(false);
+    private final AtomicReference<WWWAuthenticateHeader> authChallenge = new AtomicReference<>();
+    private final AtomicReference<String> boundRtspUrl = new AtomicReference<>();
+    private final AtomicReference<PendingStream> pendingStream = new AtomicReference<>();
+    private volatile Future<?> streamingTask;
+    private volatile ScheduledFuture<?> reRegisterTask;
+    private volatile String activeDeviceId;
+
+    public Gb28181DeviceSimulator(SipClientProperties properties) {
+        this.properties = properties;
+    }
+
+    @EventListener(ApplicationReadyEvent.class)
+    public void onReady() {
+        if (properties.isAutoStart()) {
+            log.warn("sip.client.auto-start=true 已废弃,请使用 POST /api/sip/device/register?deviceId=...");
+        } else {
+            log.info("GB28181 设备模拟就绪,请手动调用 POST /api/sip/device/register?deviceId=... 注册");
+        }
+    }
+
+    @PreDestroy
+    public void destroy() {
+        cancelReRegister();
+        try {
+            unregisterInternal(true);
+        } catch (Exception e) {
+            log.debug("销毁时注销失败: {}", e.getMessage());
+        }
+        scheduler.shutdownNow();
+    }
+
+    /**
+     * 手动注册到 GB28181 平台。
+     *
+     * @param deviceId 20 位设备编码(动态传入)
+     */
+    public synchronized Map<String, Object> register(String deviceId) {
+        validateDeviceId(deviceId);
+        if (registered.get() && deviceId.equals(activeDeviceId)) {
+            log.info("设备已注册: deviceId={}", activeDeviceId);
+            return status();
+        }
+        if (sipStack != null) {
+            unregisterInternal(true);
+        }
+        try {
+            activeDeviceId = deviceId.trim();
+            initStack();
+            registerAuthSent.set(false);
+            authChallenge.set(null);
+            sendRegister(false, properties.getRegisterExpires());
+            scheduleReRegister();
+            log.info("GB28181 注册请求已发送: deviceId={}, platform={}@{}:{}, transport={}",
+                    activeDeviceId, properties.getPlatformId(),
+                    properties.getServerHost(), properties.getServerPort(), sipTransport);
+        } catch (Exception e) {
+            activeDeviceId = null;
+            shutdownStack();
+            sipStack = null;
+            sipProvider = null;
+            throw new IllegalStateException("GB28181 注册失败: " + e.getMessage(), e);
+        }
+        return status();
+    }
+
+    /** 手动注销(REGISTER Expires=0)并释放 SIP 栈。 */
+    public synchronized Map<String, Object> unregister() {
+        unregisterInternal(true);
+        return status();
+    }
+
+    private void unregisterInternal(boolean sendExpiresZero) {
+        cancelReRegister();
+        if (streamingTask != null) {
+            streamingTask.cancel(true);
+            streamingTask = null;
+        }
+        if (sendExpiresZero && sipStack != null && StringUtils.hasText(activeDeviceId)) {
+            try {
+                registerAuthSent.set(false);
+                sendRegister(registered.get(), 0);
+                log.info("GB28181 注销请求已发送: deviceId={}", activeDeviceId);
+            } catch (Exception e) {
+                log.warn("GB28181 注销请求发送失败: {}", e.getMessage());
+            }
+        }
+        registered.set(false);
+        activeDeviceId = null;
+        boundRtspUrl.set(null);
+        pendingStream.set(null);
+        shutdownStack();
+        sipStack = null;
+        sipProvider = null;
+    }
+
+    /**
+     * 绑定 RTSP 并等待平台点播(与海康摄像机:配置好平台后,平台发 INVITE 取流)。
+     */
+    public Gb28181StreamResult bindRtspAndWait(String rtspUrl, int inviteWaitSeconds, int streamDurationSeconds) {
+        if (!StringUtils.hasText(rtspUrl)) {
+            return fail("rtspUrl 不能为空");
+        }
+        if (!registered.get() || !StringUtils.hasText(activeDeviceId)) {
+            return fail("设备未注册,请先调用 POST /api/sip/device/register?deviceId=...");
+        }
+        if (sipStack == null) {
+            return fail("SIP 栈未就绪,请重新注册设备");
+        }
+
+        PendingStream session = new PendingStream(rtspUrl, streamDurationSeconds);
+        pendingStream.set(session);
+        boundRtspUrl.set(rtspUrl);
+
+        log.info("已绑定 RTSP,等待平台 INVITE: deviceId={}, wait={}s", activeDeviceId, inviteWaitSeconds);
+        log.info(">>> 请在 GB28181 平台对设备 {} 点击「实时预览」", activeDeviceId);
+
+        try {
+            if (!session.inviteReady.await(inviteWaitSeconds, TimeUnit.SECONDS)) {
+                pendingStream.compareAndSet(session, null);
+                return fail("等待平台 INVITE 超时,请在平台对设备 " + activeDeviceId + " 发起实时预览");
+            }
+            long streamWait = streamDurationSeconds + 15L;
+            if (!session.streamDone.await(streamWait, TimeUnit.SECONDS)) {
+                return fail("推流超时");
+            }
+            return session.result != null ? session.result : fail("推流未返回结果");
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return fail("等待被中断");
+        }
+    }
+
+    public Map<String, Object> status() {
+        Map<String, Object> map = new HashMap<>();
+        map.put("registered", registered.get());
+        map.put("deviceId", activeDeviceId);
+        map.put("platformId", properties.getPlatformId());
+        map.put("serverHost", properties.getServerHost());
+        map.put("serverPort", properties.getServerPort());
+        map.put("domain", properties.getDomain());
+        map.put("sipTransport", sipTransport);
+        map.put("localSipAddress", localIp != null ? localIp + ":" + localSipPort : null);
+        map.put("boundRtspUrl", boundRtspUrl.get());
+        map.put("streaming", streamingTask != null && !streamingTask.isDone());
+        return map;
+    }
+
+    private void initStack() throws Exception {
+        SipFactory sipFactory = SipFactory.getInstance();
+        sipFactory.setPathName("gov.nist");
+        java.util.Properties stackProps = new java.util.Properties();
+        stackProps.setProperty("javax.sip.STACK_NAME", "UskyGb28181Device");
+        stackProps.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "0");
+
+        localIp = NetworkUtils.resolveLocalIp(null);
+        sipTransport = normalizeTransport(properties.getSipTransport());
+        stackProps.setProperty("javax.sip.OUTBOUND_PROXY",
+                properties.getServerHost() + ":" + properties.getServerPort() + "/" + sipTransport);
+
+        sipStack = sipFactory.createSipStack(stackProps);
+        addressFactory = sipFactory.createAddressFactory();
+        messageFactory = sipFactory.createMessageFactory();
+        headerFactory = sipFactory.createHeaderFactory();
+
+        localSipPort = findAvailablePort();
+        ListeningPoint lp = sipStack.createListeningPoint(localIp, localSipPort, sipTransport);
+        sipProvider = sipStack.createSipProvider(lp);
+        sipProvider.addSipListener(this);
+    }
+
+    private void scheduleReRegister() {
+        cancelReRegister();
+        long interval = Math.max(60, (long) (properties.getRegisterExpires() * 0.85));
+        reRegisterTask = scheduler.scheduleAtFixedRate(() -> {
+            if (sipStack != null && StringUtils.hasText(activeDeviceId)) {
+                try {
+                    registerAuthSent.set(false);
+                    sendRegister(registered.get(), properties.getRegisterExpires());
+                } catch (Exception e) {
+                    log.warn("GB28181 续注册失败: {}", e.getMessage());
+                }
+            }
+        }, interval, interval, TimeUnit.SECONDS);
+    }
+
+    private void cancelReRegister() {
+        if (reRegisterTask != null) {
+            reRegisterTask.cancel(false);
+            reRegisterTask = null;
+        }
+    }
+
+    private void sendRegister(boolean withAuth, int expires) throws Exception {
+        ensureActiveDeviceId();
+        SipURI requestUri = createUri(activeDeviceId, properties.getDomain());
+        requestUri.setPort(properties.getServerPort());
+        requestUri.setTransportParam(sipTransport);
+
+        Request register = buildRequest(Request.REGISTER, requestUri, activeDeviceId, "reg-tag");
+        register.addHeader(headerFactory.createExpiresHeader(expires));
+
+        if (withAuth) {
+            AuthorizationHeader auth = buildAuthorization(Request.REGISTER, requestUri.toString());
+            if (auth != null) {
+                register.addHeader(auth);
+            }
+        }
+
+        ClientTransaction ct = sipProvider.getNewClientTransaction(register);
+        ct.sendRequest();
+        log.debug("发送 REGISTER{} -> {}:{}", withAuth ? "(鉴权)" : "", properties.getServerHost(), properties.getServerPort());
+    }
+
+    private void handleIncomingInvite(RequestEvent event) throws Exception {
+        Request request = event.getRequest();
+        PendingStream session = pendingStream.get();
+        String rtspUrl = session != null ? session.rtspUrl : boundRtspUrl.get();
+        if (!StringUtils.hasText(rtspUrl)) {
+            sendResponse(event, Response.BUSY_HERE);
+            log.warn("收到平台 INVITE 但未绑定 RTSP,请先调用 POST /api/sip/push?rtspUrl=...");
+            return;
+        }
+
+        String sdpBody = extractSdp(request);
+        if (!StringUtils.hasText(sdpBody)) {
+            sendResponse(event, Response.BAD_REQUEST);
+            failPending(fail("平台 INVITE 无 SDP"));
+            return;
+        }
+
+        log.info("平台 INVITE SDP:\n{}", sdpBody);
+        SdpMediaInfo media = SdpUtils.parseVideoMedia(sdpBody);
+        if (media == null) {
+            sendResponse(event, Response.NOT_ACCEPTABLE);
+            log.warn("平台 INVITE SDP 解析失败,无法识别 video m= 行");
+            failPending(fail("平台 INVITE SDP 无效"));
+            return;
+        }
+
+        if (session == null) {
+            session = new PendingStream(rtspUrl, properties.getDefaultDurationSeconds());
+            pendingStream.set(session);
+        }
+
+        int localRtpPort = findAvailablePort();
+        String remoteHost = resolveRemoteRtpHost(media, event);
+
+        String answerSdp = SdpUtils.buildGb28181AnswerSdp(
+                localIp, localRtpPort, media, activeDeviceId, GB28181_SSRC);
+
+        log.info("应答 SDP:\n{}", answerSdp);
+        Response ok = messageFactory.createResponse(Response.OK, request);
+        ToHeader toHeader = (ToHeader) ok.getHeader(ToHeader.NAME);
+        toHeader.setTag(generateTag());
+        ok.addHeader(createContactHeader());
+        Header subjectHeader = request.getHeader("Subject");
+        if (subjectHeader != null) {
+            ok.addHeader(subjectHeader);
+        }
+        ok.setContent(answerSdp, headerFactory.createContentTypeHeader("application", "sdp"));
+
+        ServerTransaction st = event.getServerTransaction();
+        if (st == null) {
+            st = sipProvider.getNewServerTransaction(request);
+        }
+        st.sendResponse(ok);
+
+        CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
+        session.callId = callIdHeader != null ? callIdHeader.getCallId() : null;
+        session.localRtpPort = localRtpPort;
+        session.remoteRtpHost = remoteHost;
+        session.remoteRtpPort = media.getPort();
+        if (!StringUtils.hasText(media.getSsrc())) {
+            media.setSsrc(GB28181_SSRC);
+        }
+        session.media = media;
+        session.rtspUrl = rtspUrl;
+
+        log.info("已应答平台 INVITE 200 OK: rtp={}:{}, protocol={}, codec={}, setup={}",
+                remoteHost, media.getPort(), media.getMediaProtocol(), media.getCodec(), media.getSetup());
+    }
+
+    private String extractSdp(Request request) {
+        byte[] raw = request.getRawContent();
+        if (raw != null && raw.length > 0) {
+            return new String(raw, StandardCharsets.UTF_8);
+        }
+        Object content = request.getContent();
+        if (content instanceof byte[]) {
+            return new String((byte[]) content, StandardCharsets.UTF_8);
+        }
+        return content != null ? content.toString() : null;
+    }
+
+    private String resolveRemoteRtpHost(SdpMediaInfo media, RequestEvent event) {
+        String host = media.getConnectionAddress();
+        if (host != null && !"0.0.0.0".equals(host)) {
+            return host;
+        }
+        if (event instanceof RequestEventExt) {
+            RequestEventExt ext = (RequestEventExt) event;
+            if (StringUtils.hasText(ext.getRemoteIpAddress())) {
+                return ext.getRemoteIpAddress();
+            }
+        }
+        return properties.getServerHost();
+    }
+
+    private void handleIncomingAck(RequestEvent event) {
+        PendingStream session = pendingStream.get();
+        if (session == null || session.streamStarted) {
+            return;
+        }
+        session.streamStarted = true;
+        session.inviteReady.countDown();
+        log.info("收到平台 ACK,开始 RTSP 推流: {}", maskUrl(session.rtspUrl));
+
+        streamingTask = scheduler.submit(() -> {
+            try {
+                long packets = RtspGb28181RtpSender.stream(
+                        session.rtspUrl,
+                        session.media,
+                        session.remoteRtpHost,
+                        session.remoteRtpPort,
+                        session.localRtpPort,
+                        session.durationSeconds,
+                        properties.getRtspTransport());
+                session.result = Gb28181StreamResult.builder()
+                        .success(packets > 0)
+                        .rtspUrl(session.rtspUrl)
+                        .callId(session.callId)
+                        .localRtpPort(session.localRtpPort)
+                        .remoteRtpHost(session.remoteRtpHost)
+                        .remoteRtpPort(session.remoteRtpPort)
+                        .packetsSent(packets)
+                        .errors(packets > 0 ? new ArrayList<>() : singleError(
+                                "未发出 RTP 包,请检查 RTSP 地址/网络或是否收到 IDR 关键帧"))
+                        .build();
+                if (packets > 0) {
+                    log.info("推流完成: packets={}", packets);
+                } else {
+                    log.warn("推流结束但未发送 RTP 包");
+                }
+            } catch (Exception e) {
+                log.error("推流失败: {}", e.getMessage(), e);
+                session.result = fail("推流失败: " + e.getMessage());
+            } finally {
+                session.streamDone.countDown();
+                pendingStream.compareAndSet(session, null);
+            }
+        });
+    }
+
+    private void handleIncomingBye(RequestEvent event) throws Exception {
+        log.info("收到平台 BYE");
+        if (streamingTask != null) {
+            streamingTask.cancel(true);
+        }
+        sendResponse(event, Response.OK);
+        PendingStream session = pendingStream.getAndSet(null);
+        if (session != null) {
+            session.inviteReady.countDown();
+            if (session.result == null) {
+                session.result = fail("平台 BYE 结束会话");
+            }
+            session.streamDone.countDown();
+        }
+    }
+
+    private Request buildRequest(String method, SipURI requestUri, String fromUser, String fromTag) throws Exception {
+        Address toAddress = addressFactory.createAddress(requestUri);
+        ToHeader toHeader = headerFactory.createToHeader(toAddress, null);
+
+        SipURI fromUri = createUri(fromUser, properties.getDomain());
+        fromUri.setPort(localSipPort);
+        fromUri.setTransportParam(sipTransport);
+        FromHeader fromHeader = headerFactory.createFromHeader(addressFactory.createAddress(fromUri), fromTag);
+
+        ContactHeader contactHeader = createContactHeader();
+
+        List<ViaHeader> viaHeaders = new ArrayList<>();
+        viaHeaders.add(headerFactory.createViaHeader(localIp, localSipPort, sipTransport, null));
+
+        CSeqHeader cSeqHeader = headerFactory.createCSeqHeader(cseq++, method);
+        Request request = messageFactory.createRequest(requestUri, method, sipProvider.getNewCallId(),
+                cSeqHeader, fromHeader, toHeader, viaHeaders, headerFactory.createMaxForwardsHeader(70));
+        request.addHeader(contactHeader);
+        return request;
+    }
+
+    private ContactHeader createContactHeader() throws Exception {
+        ensureActiveDeviceId();
+        SipURI contactUri = createUri(activeDeviceId, properties.getDomain());
+        contactUri.setHost(localIp);
+        contactUri.setPort(localSipPort);
+        contactUri.setTransportParam(sipTransport);
+        return headerFactory.createContactHeader(addressFactory.createAddress(contactUri));
+    }
+
+    private AuthorizationHeader buildAuthorization(String method, String uri) throws Exception {
+        WWWAuthenticateHeader challenge = authChallenge.get();
+        if (challenge == null) {
+            return null;
+        }
+        String cnonce = UUID.randomUUID().toString().replace("-", "").substring(0, 8);
+        String nc = "00000001";
+        String response = SipDigestAuth.computeResponse(
+                activeDeviceId, properties.getPassword(), challenge.getRealm(),
+                method, uri, challenge.getNonce(), nc, cnonce, challenge.getQop());
+
+        AuthorizationHeader auth = headerFactory.createAuthorizationHeader("Digest");
+        auth.setUsername(activeDeviceId);
+        auth.setRealm(challenge.getRealm());
+        auth.setNonce(challenge.getNonce());
+        auth.setURI(addressFactory.createURI(uri));
+        auth.setResponse(response);
+        auth.setAlgorithm("MD5");
+        if (challenge.getQop() != null && !challenge.getQop().isEmpty()) {
+            auth.setQop(challenge.getQop());
+            auth.setCNonce(cnonce);
+            auth.setNonceCount(Integer.parseInt(nc, 16));
+        }
+        return auth;
+    }
+
+    private void sendResponse(RequestEvent event, int code) throws Exception {
+        Response response = messageFactory.createResponse(code, event.getRequest());
+        if (code >= 200) {
+            ToHeader toHeader = (ToHeader) response.getHeader(ToHeader.NAME);
+            if (toHeader.getTag() == null) {
+                toHeader.setTag(generateTag());
+            }
+        }
+        ServerTransaction st = event.getServerTransaction();
+        if (st == null) {
+            st = sipProvider.getNewServerTransaction(event.getRequest());
+        }
+        st.sendResponse(response);
+    }
+
+    private SipURI createUri(String user, String domain) throws Exception {
+        return addressFactory.createSipURI(user, domain);
+    }
+
+    private void failPending(Gb28181StreamResult result) {
+        PendingStream session = pendingStream.getAndSet(null);
+        if (session != null) {
+            session.result = result;
+            session.inviteReady.countDown();
+            session.streamDone.countDown();
+        }
+    }
+
+    private static Gb28181StreamResult fail(String msg) {
+        List<String> errors = new ArrayList<>();
+        errors.add(msg);
+        return Gb28181StreamResult.builder().success(false).errors(errors).build();
+    }
+
+    private static List<String> singleError(String msg) {
+        List<String> errors = new ArrayList<>();
+        errors.add(msg);
+        return errors;
+    }
+
+    private static String maskUrl(String url) {
+        return url.replaceAll("://([^:]+):([^@]+)@", "://***:***@");
+    }
+
+    private static String normalizeTransport(String t) {
+        return t != null && t.equalsIgnoreCase("udp") ? "udp" : "tcp";
+    }
+
+    private static int findAvailablePort() throws Exception {
+        try (java.net.DatagramSocket socket = new java.net.DatagramSocket(0)) {
+            int port = socket.getLocalPort();
+            return port % 2 == 0 ? port : port + 1;
+        }
+    }
+
+    private static String generateTag() {
+        return Long.toHexString(System.nanoTime());
+    }
+
+    private void ensureActiveDeviceId() {
+        if (!StringUtils.hasText(activeDeviceId)) {
+            throw new IllegalStateException("设备 ID 未设置,请先注册");
+        }
+    }
+
+    private static void validateDeviceId(String deviceId) {
+        if (!StringUtils.hasText(deviceId)) {
+            throw new IllegalArgumentException("deviceId 不能为空");
+        }
+        String id = deviceId.trim();
+        if (id.length() != 20 || !id.chars().allMatch(Character::isDigit)) {
+            throw new IllegalArgumentException("deviceId 必须为 20 位数字");
+        }
+    }
+
+    private void shutdownStack() {
+        try {
+            if (sipProvider != null) {
+                sipProvider.removeSipListener(this);
+            }
+        } catch (Exception ignored) {
+        }
+        if (sipStack != null) {
+            try {
+                ((SipStackImpl) sipStack).stop();
+            } catch (Exception ignored) {
+            }
+        }
+    }
+
+    @Override
+    public void processResponse(ResponseEvent event) {
+        ClientTransaction ct = event.getClientTransaction();
+        if (ct == null || !Request.REGISTER.equals(ct.getRequest().getMethod())) {
+            return;
+        }
+        int status = event.getResponse().getStatusCode();
+        if (status == Response.UNAUTHORIZED || status == Response.PROXY_AUTHENTICATION_REQUIRED) {
+            WWWAuthenticateHeader challenge = (WWWAuthenticateHeader) event.getResponse().getHeader(WWWAuthenticateHeader.NAME);
+            if (challenge == null) {
+                challenge = (WWWAuthenticateHeader) event.getResponse().getHeader(ProxyAuthenticateHeader.NAME);
+            }
+            authChallenge.set(challenge);
+            if (!registerAuthSent.getAndSet(true)) {
+                try {
+                    ExpiresHeader expires = (ExpiresHeader) ct.getRequest().getHeader(ExpiresHeader.NAME);
+                    int expireSeconds = expires != null ? expires.getExpires() : properties.getRegisterExpires();
+                    sendRegister(true, expireSeconds);
+                } catch (Exception e) {
+                    log.error("REGISTER 鉴权失败: {}", e.getMessage());
+                }
+            }
+        } else if (status == Response.OK) {
+            ExpiresHeader expires = (ExpiresHeader) ct.getRequest().getHeader(ExpiresHeader.NAME);
+            if (expires != null && expires.getExpires() == 0) {
+                registered.set(false);
+                log.info("GB28181 注销成功: deviceId={}", activeDeviceId);
+            } else {
+                registered.set(true);
+                log.info("GB28181 注册成功: deviceId={}(平台应可见设备在线)", activeDeviceId);
+            }
+        } else if (status >= 300) {
+            registered.set(false);
+            log.warn("GB28181 注册失败: deviceId={}, status={}", activeDeviceId, status);
+        }
+    }
+
+    @Override
+    public void processRequest(RequestEvent event) {
+        String method = event.getRequest().getMethod();
+        log.info("收到 SIP 请求: {} from {}", method, formatRemote(event));
+        try {
+            if (Request.INVITE.equals(method)) {
+                handleIncomingInvite(event);
+            } else if (Request.ACK.equals(method)) {
+                handleIncomingAck(event);
+            } else if (Request.BYE.equals(method)) {
+                handleIncomingBye(event);
+            }
+        } catch (Exception e) {
+            log.error("处理 {} 失败: {}", method, e.getMessage(), e);
+            failPending(fail("处理 " + method + " 失败: " + e.getMessage()));
+        }
+    }
+
+    private String formatRemote(RequestEvent event) {
+        if (event instanceof RequestEventExt) {
+            RequestEventExt ext = (RequestEventExt) event;
+            return ext.getRemoteIpAddress() + ":" + ext.getRemotePort();
+        }
+        return properties.getServerHost();
+    }
+
+    @Override
+    public void processTimeout(TimeoutEvent timeoutEvent) {
+        log.warn("SIP 超时");
+    }
+
+    @Override
+    public void processIOException(IOExceptionEvent exceptionEvent) {
+        log.warn("SIP IO 异常: {}:{}", exceptionEvent.getHost(), exceptionEvent.getPort());
+        registered.set(false);
+    }
+
+    @Override
+    public void processTransactionTerminated(TransactionTerminatedEvent event) {
+    }
+
+    @Override
+    public void processDialogTerminated(DialogTerminatedEvent event) {
+    }
+
+    private static final class PendingStream {
+        private String rtspUrl;
+        private final int durationSeconds;
+        private final CountDownLatch inviteReady = new CountDownLatch(1);
+        private final CountDownLatch streamDone = new CountDownLatch(1);
+        private volatile boolean streamStarted;
+        private volatile Gb28181StreamResult result;
+        private String callId;
+        private int localRtpPort;
+        private String remoteRtpHost;
+        private int remoteRtpPort;
+        private SdpMediaInfo media;
+
+        private PendingStream(String rtspUrl, int durationSeconds) {
+            this.rtspUrl = rtspUrl;
+            this.durationSeconds = durationSeconds;
+        }
+    }
+}

+ 22 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/device/Gb28181StreamResult.java

@@ -0,0 +1,22 @@
+package com.usky.cdi.service.sip.device;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+@Builder
+public class Gb28181StreamResult {
+
+    private boolean success;
+    private String rtspUrl;
+    private String callId;
+    private int localRtpPort;
+    private String remoteRtpHost;
+    private Integer remoteRtpPort;
+    private long packetsSent;
+    @Builder.Default
+    private List<String> errors = new ArrayList<>();
+}

+ 65 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/ps/Gb28181PsMuxer.java

@@ -0,0 +1,65 @@
+package com.usky.cdi.service.sip.ps;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * GB28181 标准 PS 封装(与 gb28181_streampackageForH264 一致,FFmpeg -f mpeg 可解)。
+ * <p>
+ * I 帧:Pack + System + PSM + PES(Annex-B)<br>
+ * P 帧:Pack + PES(Annex-B)<br>
+ * 超大帧按 65522 字节分片,每片独立 Pack(I 帧每片带 System+PSM)。
+ */
+public final class Gb28181PsMuxer {
+
+    /** gb28181 PS_PES_PAYLOAD_SIZE */
+    private static final int MAX_PES_PAYLOAD = 65522;
+    private static final int VIDEO_STREAM_ID = 0xE0;
+
+    private Gb28181PsMuxer() {
+    }
+
+    public static byte[] mux(List<byte[]> nals, long pts90k, boolean keyFrame) throws IOException {
+        if (nals == null || nals.isEmpty()) {
+            return new byte[0];
+        }
+        return muxAnnexB(toAnnexB(nals), pts90k, keyFrame);
+    }
+
+    static byte[] muxAnnexB(byte[] annexB, long pts90k, boolean keyFrame) throws IOException {
+        if (annexB == null || annexB.length == 0) {
+            return new byte[0];
+        }
+        ByteArrayOutputStream out = new ByteArrayOutputStream(annexB.length + 256);
+        int offset = 0;
+        while (offset < annexB.length) {
+            int chunk = Math.min(annexB.length - offset, MAX_PES_PAYLOAD);
+            writePsChunk(out, annexB, offset, chunk, pts90k, keyFrame);
+            offset += chunk;
+        }
+        return out.toByteArray();
+    }
+
+    private static void writePsChunk(ByteArrayOutputStream out, byte[] annexB, int offset, int length,
+                                     long pts90k, boolean keyFrame) throws IOException {
+        out.write(Gb28181PsHeaders.buildStandardPackHeader(pts90k));
+        if (keyFrame) {
+            out.write(Gb28181PsHeaders.buildSystemHeader());
+            out.write(Gb28181PsHeaders.buildStandardPsm());
+        }
+        out.write(Gb28181PsHeaders.buildPesHeader(VIDEO_STREAM_ID, length, pts90k));
+        out.write(annexB, offset, length);
+    }
+
+    private static byte[] toAnnexB(List<byte[]> nals) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        for (byte[] nal : nals) {
+            if (nal != null && nal.length > 0) {
+                out.write(new byte[]{0x00, 0x00, 0x00, 0x01});
+                out.write(nal);
+            }
+        }
+        return out.toByteArray();
+    }
+}

+ 162 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/ps/H264AccessUnitBuilder.java

@@ -0,0 +1,162 @@
+package com.usky.cdi.service.sip.ps;
+
+import com.usky.cdi.service.sip.rtp.AnnexBParser;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * 组装 Annex-B H264 访问单元,保证 I 帧前 SPS/PPS 顺序正确。
+ */
+public final class H264AccessUnitBuilder {
+
+    private byte[] sps;
+    private byte[] pps;
+    private boolean sentFirstKeyFrame;
+
+    public void updateParameterSets(List<byte[]> nals) {
+        for (byte[] nal : nals) {
+            if (nal.length == 0) {
+                continue;
+            }
+            int type = nal[0] & 0x1F;
+            if (type == 7) {
+                sps = nal;
+            } else if (type == 8) {
+                pps = nal;
+            }
+        }
+    }
+
+    public boolean hasParameterSets() {
+        return sps != null && pps != null;
+    }
+
+    public void setSps(byte[] sps) {
+        if (sps != null && sps.length > 0) {
+            this.sps = sps;
+        }
+    }
+
+    public void setPps(byte[] pps) {
+        if (pps != null && pps.length > 0) {
+            this.pps = pps;
+        }
+    }
+
+    public boolean isParameterSetOnly(List<byte[]> nals) {
+        if (nals.isEmpty()) {
+            return true;
+        }
+        for (byte[] nal : nals) {
+            if (nal.length == 0) {
+                continue;
+            }
+            int type = nal[0] & 0x1F;
+            if (type != 7 && type != 8) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public boolean isKeyFrame(List<byte[]> nals) {
+        for (byte[] nal : nals) {
+            if (nal.length > 0 && (nal[0] & 0x1F) == 5) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @return Annex-B 访问单元;未就绪返回 null
+     */
+    public byte[] build(byte[] packetData, boolean keyFrameHint) throws IOException {
+        List<byte[]> nals = AnnexBParser.extractNals(packetData);
+        updateParameterSets(nals);
+        if (nals.isEmpty() || isParameterSetOnly(nals)) {
+            return null;
+        }
+
+        boolean keyFrame = keyFrameHint || isKeyFrame(nals);
+        if (!keyFrame && !sentFirstKeyFrame) {
+            return null;
+        }
+        if (keyFrame && !hasParameterSets() && !(containsType(nals, 7) && containsType(nals, 8))) {
+            return null;
+        }
+
+        List<byte[]> ordered = orderNals(nals, keyFrame);
+        sentFirstKeyFrame = sentFirstKeyFrame || keyFrame;
+        return toAnnexB(ordered);
+    }
+
+    private List<byte[]> orderNals(List<byte[]> nals, boolean keyFrame) {
+        List<byte[]> ordered = new ArrayList<>();
+        if (keyFrame) {
+            if (sps != null && !containsType(nals, 7)) {
+                ordered.add(sps);
+            }
+            if (pps != null && !containsType(nals, 8)) {
+                ordered.add(pps);
+            }
+        }
+        List<byte[]> sorted = new ArrayList<>(nals);
+        sorted.sort(Comparator.comparingInt(H264AccessUnitBuilder::nalPriority));
+        for (byte[] nal : sorted) {
+            if (nal.length == 0) {
+                continue;
+            }
+            int type = nal[0] & 0x1F;
+            if (type == 7 && ordered.stream().anyMatch(n -> (n[0] & 0x1F) == 7)) {
+                continue;
+            }
+            if (type == 8 && ordered.stream().anyMatch(n -> (n[0] & 0x1F) == 8)) {
+                continue;
+            }
+            ordered.add(nal);
+        }
+        return ordered;
+    }
+
+    private static int nalPriority(byte[] nal) {
+        if (nal.length == 0) {
+            return 99;
+        }
+        switch (nal[0] & 0x1F) {
+            case 7:
+                return 0;
+            case 8:
+                return 1;
+            case 5:
+                return 2;
+            default:
+                return 3;
+        }
+    }
+
+    private static boolean containsType(List<byte[]> nals, int type) {
+        for (byte[] nal : nals) {
+            if (nal.length > 0 && (nal[0] & 0x1F) == type) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static byte[] toAnnexB(List<byte[]> nals) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        for (byte[] nal : nals) {
+            if (nal.length == 0) {
+                continue;
+            }
+            out.write(new byte[]{0x00, 0x00, 0x00, 0x01});
+            out.write(nal);
+        }
+        return out.toByteArray();
+    }
+}

+ 257 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/push/RtspGb28181RtpSender.java

@@ -0,0 +1,257 @@
+package com.usky.cdi.service.sip.push;
+
+import com.usky.cdi.service.sip.model.SdpMediaInfo;
+import com.usky.cdi.service.sip.ps.Gb28181PsMuxer;
+import com.usky.cdi.service.sip.ps.H264AccessUnitBuilder;
+import com.usky.cdi.service.sip.ps.H264ExtradataUtils;
+import com.usky.cdi.service.sip.rtp.AnnexBParser;
+import com.usky.cdi.service.sip.rtp.PsRtpPacketizer;
+import com.usky.cdi.service.sip.sdp.SdpUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.bytedeco.ffmpeg.avcodec.AVPacket;
+import org.bytedeco.ffmpeg.global.avcodec;
+import org.bytedeco.javacv.FFmpegFrameGrabber;
+
+import java.io.OutputStream;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * GB/T 28181 推流:JavaCV 拉 RTSP → 海康兼容 PS 封装 → RTP(PT=96)。
+ */
+@Slf4j
+public final class RtspGb28181RtpSender {
+
+    private static final long DEFAULT_SSRC = 100_000_001L;
+    private static final int DEFAULT_TIMESTAMP_INCREMENT = 3600;
+    private static final long RTSP_DATA_WAIT_MS = 20_000L;
+    private static final long FIRST_RTP_WAIT_MS = 45_000L;
+
+    private RtspGb28181RtpSender() {
+    }
+
+    public static long stream(String rtspUrl, SdpMediaInfo media, String rtpHost, int rtpPort,
+                              int localRtpPort, int durationSeconds, String rtspTransport) throws Exception {
+        int payloadType = media != null && media.getPayloadType() > 0 ? media.getPayloadType() : 96;
+        long ssrc = SdpUtils.parseSsrcValue(media != null ? media.getSsrc() : null, DEFAULT_SSRC);
+        boolean useTcp = media != null && media.isTcpMedia();
+        String answerSetup = media != null && "active".equalsIgnoreCase(media.getSetup()) ? "passive" : "active";
+
+        log.info("GB28181 PS 推流: target={}:{}, localRtpPort={}, ssrc={}, tcp={}",
+                rtpHost, rtpPort, localRtpPort, ssrc, useTcp);
+
+        InetAddress remoteAddress = InetAddress.getByName(rtpHost);
+        AtomicLong packetCount = new AtomicLong();
+        H264AccessUnitBuilder auBuilder = new H264AccessUnitBuilder();
+
+        try (RtpTransport transport = openTransport(useTcp, answerSetup, remoteAddress, rtpPort, localRtpPort)) {
+            FFmpegFrameGrabber grabber = createGrabber(rtspUrl, rtspTransport);
+            try {
+                grabber.start();
+                int videoStream = grabber.getVideoStream();
+                double fps = grabber.getFrameRate();
+                int timestampIncrement = fps > 1 && fps < 121
+                        ? (int) (90000 / fps) : DEFAULT_TIMESTAMP_INCREMENT;
+                H264ExtradataUtils.seedParameterSets(grabber, auBuilder);
+                log.info("RTSP 已连接: videoStream={}, fps={}, tsInc={}, hasSpsPps={}",
+                        videoStream, fps, timestampIncrement, auBuilder.hasParameterSets());
+
+                int sequence = 0;
+                long timestamp = 0;
+                long endTime = System.currentTimeMillis() + durationSeconds * 1000L;
+                long streamStart = System.currentTimeMillis();
+                long rtspDataDeadline = streamStart + RTSP_DATA_WAIT_MS;
+                long firstRtpDeadline = streamStart + FIRST_RTP_WAIT_MS;
+                long rtspVideoPackets = 0;
+                long psPacks = 0;
+
+                while (System.currentTimeMillis() < endTime) {
+                    AVPacket pkt = grabber.grabPacket();
+                    if (pkt == null || pkt.size() <= 0) {
+                        if (rtspVideoPackets == 0 && System.currentTimeMillis() > rtspDataDeadline) {
+                            throw new IllegalStateException(
+                                    "RTSP 在 " + (RTSP_DATA_WAIT_MS / 1000) + "s 内未收到视频包");
+                        }
+                        Thread.sleep(2);
+                        continue;
+                    }
+                    if (pkt.stream_index() != videoStream) {
+                        continue;
+                    }
+                    rtspVideoPackets++;
+
+                    byte[] data = new byte[pkt.size()];
+                    pkt.data().get(data, 0, pkt.size());
+                    boolean keyFrame = (pkt.flags() & avcodec.AV_PKT_FLAG_KEY) != 0;
+                    byte[] accessUnit = auBuilder.build(data, keyFrame);
+                    if (accessUnit == null) {
+                        if (packetCount.get() == 0 && System.currentTimeMillis() > firstRtpDeadline) {
+                            throw new IllegalStateException(
+                                    "等待首个可发送帧超时,已收 RTSP 包=" + rtspVideoPackets
+                                            + ",hasSpsPps=" + auBuilder.hasParameterSets());
+                        }
+                        continue;
+                    }
+
+                    List<byte[]> nals = AnnexBParser.extractNals(accessUnit);
+                    boolean psKeyFrame = keyFrame || auBuilder.isKeyFrame(nals);
+                    byte[] psFrame = Gb28181PsMuxer.mux(nals, timestamp, psKeyFrame);
+                    if (psFrame.length == 0) {
+                        continue;
+                    }
+                    sequence = sendPsFrame(transport, psFrame, payloadType, sequence, timestamp, ssrc, packetCount);
+                    timestamp += timestampIncrement;
+                    psPacks++;
+
+                    if (psPacks == 1) {
+                        log.info("首包 PS 已发送: psBytes={}, keyFrame={}, pesHead=[{}], rtspPackets={}",
+                                psFrame.length, psKeyFrame, hexHead(psFrame, 48), rtspVideoPackets);
+                    }
+                }
+
+                log.info("GB28181 推流统计: psPacks={}, rtpPackets={}, rtspVideoPackets={}",
+                        psPacks, packetCount.get(), rtspVideoPackets);
+                if (packetCount.get() == 0) {
+                    throw new IllegalStateException("推流结束但未发出 RTP");
+                }
+            } finally {
+                closeGrabber(grabber);
+            }
+        }
+        return packetCount.get();
+    }
+
+    private static int sendPsFrame(RtpTransport transport, byte[] psFrame, int payloadType,
+                                   int sequence, long timestamp, long ssrc, AtomicLong packetCount) throws Exception {
+        List<byte[]> rtpPackets = PsRtpPacketizer.packetize(psFrame, payloadType, sequence, timestamp, ssrc);
+        for (byte[] rtp : rtpPackets) {
+            transport.send(rtp);
+            packetCount.incrementAndGet();
+        }
+        return (sequence + rtpPackets.size()) & 0xFFFF;
+    }
+
+    private static FFmpegFrameGrabber createGrabber(String rtspUrl, String rtspTransport) {
+        FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(rtspUrl);
+        grabber.setFormat("rtsp");
+        grabber.setOption("rtsp_transport", rtspTransport != null ? rtspTransport : "tcp");
+        grabber.setOption("stimeout", "5000000");
+        grabber.setOption("max_delay", "500000");
+        grabber.setOption("fflags", "nobuffer");
+        grabber.setOption("flags", "low_delay");
+        return grabber;
+    }
+
+    private static void closeGrabber(FFmpegFrameGrabber grabber) {
+        try {
+            grabber.stop();
+            grabber.release();
+        } catch (Exception e) {
+            log.debug("关闭 RTSP: {}", e.getMessage());
+        }
+    }
+
+    private static String hexHead(byte[] data, int max) {
+        if (data == null || data.length == 0) {
+            return "";
+        }
+        int n = Math.min(data.length, max);
+        StringBuilder sb = new StringBuilder(n * 3);
+        for (int i = 0; i < n; i++) {
+            if (i > 0) {
+                sb.append(' ');
+            }
+            sb.append(String.format("%02X", data[i] & 0xFF));
+        }
+        return sb.toString();
+    }
+
+    private static RtpTransport openTransport(boolean useTcp, String answerSetup, InetAddress remoteHost,
+                                              int remotePort, int localPort) throws Exception {
+        if (!useTcp) {
+            return new UdpTransport(remoteHost, remotePort, localPort);
+        }
+        if ("active".equalsIgnoreCase(answerSetup)) {
+            Socket socket = new Socket(remoteHost, remotePort);
+            socket.setTcpNoDelay(true);
+            return new TcpTransport(socket);
+        }
+        ServerSocket serverSocket = new ServerSocket(localPort);
+        serverSocket.setSoTimeout(30000);
+        Socket socket = serverSocket.accept();
+        serverSocket.close();
+        socket.setTcpNoDelay(true);
+        return new TcpTransport(socket);
+    }
+
+    private interface RtpTransport extends AutoCloseable {
+        void send(byte[] rtp) throws Exception;
+
+        @Override
+        void close();
+    }
+
+    private static final class UdpTransport implements RtpTransport {
+        private final DatagramSocket socket;
+        private final DatagramPacket template;
+
+        UdpTransport(InetAddress host, int port, int localPort) throws Exception {
+            if (localPort > 0) {
+                this.socket = new DatagramSocket(localPort);
+                log.info("RTP 绑定本地端口: {}", localPort);
+            } else {
+                this.socket = new DatagramSocket();
+            }
+            this.template = new DatagramPacket(new byte[0], 0, host, port);
+        }
+
+        @Override
+        public void send(byte[] rtp) throws Exception {
+            template.setData(rtp);
+            template.setLength(rtp.length);
+            socket.send(template);
+        }
+
+        @Override
+        public void close() {
+            socket.close();
+        }
+    }
+
+    private static final class TcpTransport implements RtpTransport {
+        private final Socket socket;
+        private final OutputStream out;
+
+        TcpTransport(Socket socket) throws Exception {
+            this.socket = socket;
+            this.out = socket.getOutputStream();
+        }
+
+        @Override
+        public void send(byte[] rtp) throws Exception {
+            byte[] frame = new byte[rtp.length + 2];
+            frame[0] = (byte) ((rtp.length >> 8) & 0xFF);
+            frame[1] = (byte) (rtp.length & 0xFF);
+            System.arraycopy(rtp, 0, frame, 2, rtp.length);
+            out.write(frame);
+            out.flush();
+        }
+
+        @Override
+        public void close() {
+            try {
+                out.close();
+            } catch (Exception ignored) {
+            }
+            try {
+                socket.close();
+            } catch (Exception ignored) {
+            }
+        }
+    }
+}

+ 90 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/push/RtspH264RtpSender.java

@@ -0,0 +1,90 @@
+package com.usky.cdi.service.sip.push;
+
+import com.usky.cdi.service.sip.rtp.AnnexBParser;
+import com.usky.cdi.service.sip.rtp.H264RtpPacketizer;
+import lombok.extern.slf4j.Slf4j;
+import org.bytedeco.ffmpeg.avcodec.AVPacket;
+import org.bytedeco.javacv.FFmpegFrameGrabber;
+
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.List;
+
+/**
+ * 从 RTSP 拉取 H264 码流并封装为 RTP 发送到目标地址
+ */
+@Slf4j
+public class RtspH264RtpSender {
+
+    private static final int PAYLOAD_TYPE = 96;
+    private static final long SSRC = 0x12345678L;
+    private static final int TIMESTAMP_INCREMENT = 3000;
+
+    private RtspH264RtpSender() {
+    }
+
+    public static long stream(String rtspUrl, String rtpHost, int rtpPort, int durationSeconds,
+                              String rtspTransport) throws Exception {
+        InetAddress address = InetAddress.getByName(rtpHost);
+        FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(rtspUrl);
+        grabber.setFormat("rtsp");
+        grabber.setOption("rtsp_transport", rtspTransport != null ? rtspTransport : "tcp");
+        grabber.setOption("stimeout", "5000000");
+        grabber.setOption("max_delay", "500000");
+        grabber.setOption("fflags", "nobuffer");
+        grabber.setOption("flags", "low_delay");
+
+        long packetCount = 0;
+        int sequence = 0;
+        long timestamp = 0;
+        long endTime = System.currentTimeMillis() + durationSeconds * 1000L;
+
+        try (DatagramSocket socket = new DatagramSocket()) {
+            grabber.start();
+            int videoStream = grabber.getVideoStream();
+            log.info("RTSP 已连接: {}, videoStream={}", maskUrl(rtspUrl), videoStream);
+
+            while (System.currentTimeMillis() < endTime) {
+                AVPacket pkt = grabber.grabPacket();
+                if (pkt == null || pkt.size() <= 0) {
+                    Thread.sleep(5);
+                    continue;
+                }
+                if (pkt.stream_index() != videoStream) {
+                    continue;
+                }
+                byte[] data = new byte[pkt.size()];
+                pkt.data().get(data, 0, pkt.size());
+                List<byte[]> nals = AnnexBParser.extractNals(data);
+                for (byte[] nal : nals) {
+                    List<byte[]> rtpPackets = H264RtpPacketizer.packetize(
+                            nal, PAYLOAD_TYPE, sequence, timestamp, SSRC);
+                    for (byte[] rtp : rtpPackets) {
+                        DatagramPacket dg = new DatagramPacket(rtp, rtp.length, address, rtpPort);
+                        socket.send(dg);
+                        packetCount++;
+                        sequence++;
+                    }
+                }
+                timestamp += TIMESTAMP_INCREMENT;
+            }
+        } finally {
+            try {
+                grabber.stop();
+                grabber.release();
+            } catch (Exception e) {
+                log.debug("关闭 RTSP: {}", e.getMessage());
+            }
+        }
+        log.info("RTSP 推流结束: target={}:{}, rtpPackets={}", rtpHost, rtpPort, packetCount);
+        return packetCount;
+    }
+
+    private static String maskUrl(String rtspUrl) {
+        if (rtspUrl == null) {
+            return "";
+        }
+        return rtspUrl.replaceAll("://([^:]+):([^@]+)@", "://***:***@");
+    }
+}

+ 28 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/push/SipVideoPusherFactory.java

@@ -0,0 +1,28 @@
+package com.usky.cdi.service.sip.push;
+
+import com.usky.cdi.service.sip.config.SipClientProperties;
+import com.usky.cdi.service.sip.device.Gb28181DeviceSimulator;
+import com.usky.cdi.service.sip.device.Gb28181StreamResult;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+/**
+ * 从 Spring 配置构建 GB28181 推流(委托设备模拟器)。
+ */
+@Component
+@ConditionalOnProperty(prefix = "sip.client", name = "enabled", havingValue = "true")
+public class SipVideoPusherFactory {
+
+    private final Gb28181DeviceSimulator deviceSimulator;
+    private final SipClientProperties properties;
+
+    public SipVideoPusherFactory(Gb28181DeviceSimulator deviceSimulator, SipClientProperties properties) {
+        this.deviceSimulator = deviceSimulator;
+        this.properties = properties;
+    }
+
+    public Gb28181StreamResult pushRtsp(String rtspUrl, Integer durationSeconds) {
+        int duration = durationSeconds != null ? durationSeconds : properties.getDefaultDurationSeconds();
+        return deviceSimulator.bindRtspAndWait(rtspUrl, properties.getInviteWaitSeconds(), duration);
+    }
+}

+ 78 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/rtp/AnnexBParser.java

@@ -0,0 +1,78 @@
+package com.usky.cdi.service.sip.rtp;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * 从 FFmpeg AVPacket 数据中解析 H264 NAL(Annex B 或 AVCC)
+ */
+public final class AnnexBParser {
+
+    private AnnexBParser() {
+    }
+
+    public static List<byte[]> extractNals(byte[] data) {
+        if (data == null || data.length == 0) {
+            return new ArrayList<>();
+        }
+        if (isAnnexB(data)) {
+            return splitAnnexB(data);
+        }
+        return splitAvcc(data);
+    }
+
+    private static boolean isAnnexB(byte[] data) {
+        return data.length >= 3 && data[0] == 0 && data[1] == 0
+                && (data[2] == 1 || (data.length >= 4 && data[2] == 0 && data[3] == 1));
+    }
+
+    private static List<byte[]> splitAnnexB(byte[] data) {
+        List<byte[]> nals = new ArrayList<>();
+        List<Integer> codeStarts = new ArrayList<>();
+        List<Integer> nalBegins = new ArrayList<>();
+        for (int i = 0; i < data.length - 3; i++) {
+            if (data[i] == 0 && data[i + 1] == 0 && data[i + 2] == 1) {
+                codeStarts.add(i);
+                nalBegins.add(i + 3);
+                i += 2;
+            } else if (i + 3 < data.length && data[i] == 0 && data[i + 1] == 0
+                    && data[i + 2] == 0 && data[i + 3] == 1) {
+                codeStarts.add(i);
+                nalBegins.add(i + 4);
+                i += 3;
+            }
+        }
+        if (nalBegins.isEmpty()) {
+            nals.add(data);
+            return nals;
+        }
+        for (int s = 0; s < nalBegins.size(); s++) {
+            int begin = nalBegins.get(s);
+            int end = (s + 1 < codeStarts.size()) ? codeStarts.get(s + 1) : data.length;
+            if (begin < end) {
+                nals.add(Arrays.copyOfRange(data, begin, end));
+            }
+        }
+        return nals;
+    }
+
+    private static List<byte[]> splitAvcc(byte[] data) {
+        List<byte[]> nals = new ArrayList<>();
+        int pos = 0;
+        while (pos + 4 <= data.length) {
+            int len = ((data[pos] & 0xFF) << 24) | ((data[pos + 1] & 0xFF) << 16)
+                    | ((data[pos + 2] & 0xFF) << 8) | (data[pos + 3] & 0xFF);
+            pos += 4;
+            if (len <= 0 || pos + len > data.length) {
+                break;
+            }
+            nals.add(Arrays.copyOfRange(data, pos, pos + len));
+            pos += len;
+        }
+        if (nals.isEmpty() && data.length > 0) {
+            nals.add(data);
+        }
+        return nals;
+    }
+}

+ 46 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/rtp/H264RtpPacketizer.java

@@ -0,0 +1,46 @@
+package com.usky.cdi.service.sip.rtp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * H264 NAL 单元 RTP 封装(Single NAL / FU-A)
+ */
+public final class H264RtpPacketizer {
+
+    private static final int MAX_PAYLOAD = 1400;
+
+    private H264RtpPacketizer() {
+    }
+
+    public static List<byte[]> packetize(byte[] nal, int payloadType, int startSequence,
+                                         long timestamp, long ssrc) {
+        List<byte[]> packets = new ArrayList<>();
+        if (nal == null || nal.length == 0) {
+            return packets;
+        }
+        if (nal.length <= MAX_PAYLOAD) {
+            packets.add(RtpPacketBuilder.build(nal, payloadType, startSequence, timestamp, ssrc, true));
+            return packets;
+        }
+        int nalType = nal[0] & 0x1F;
+        int nalHeader = nal[0] & 0xE0;
+        int offset = 1;
+        int remaining = nal.length - 1;
+        int seq = startSequence;
+        boolean first = true;
+        while (remaining > 0) {
+            int chunk = Math.min(remaining, MAX_PAYLOAD - 2);
+            boolean last = chunk == remaining;
+            byte[] payload = new byte[2 + chunk];
+            payload[0] = (byte) (nalHeader | 28);
+            payload[1] = (byte) ((first ? 0x80 : 0) | (last ? 0x40 : 0) | nalType);
+            System.arraycopy(nal, offset, payload, 2, chunk);
+            packets.add(RtpPacketBuilder.build(payload, payloadType, seq++, timestamp, ssrc, last));
+            offset += chunk;
+            remaining -= chunk;
+            first = false;
+        }
+        return packets;
+    }
+}

+ 35 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/rtp/PsRtpPacketizer.java

@@ -0,0 +1,35 @@
+package com.usky.cdi.service.sip.rtp;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * GB28181 PS over RTP:同一 PS 帧分片共用 timestamp,末包置 M 位。
+ */
+public final class PsRtpPacketizer {
+
+    /** GB28181 常用 RTP 负载上限 */
+    public static final int MAX_PAYLOAD = 1400;
+
+    private PsRtpPacketizer() {
+    }
+
+    public static List<byte[]> packetize(byte[] psFrame, int payloadType, int startSequence,
+                                         long timestamp, long ssrc) {
+        List<byte[]> packets = new ArrayList<>();
+        if (psFrame == null || psFrame.length == 0) {
+            return packets;
+        }
+        int offset = 0;
+        int seq = startSequence;
+        while (offset < psFrame.length) {
+            int chunk = Math.min(psFrame.length - offset, MAX_PAYLOAD);
+            boolean marker = offset + chunk >= psFrame.length;
+            byte[] payload = Arrays.copyOfRange(psFrame, offset, offset + chunk);
+            packets.add(RtpPacketBuilder.build(payload, payloadType, seq++, timestamp, ssrc, marker));
+            offset += chunk;
+        }
+        return packets;
+    }
+}

+ 29 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/rtp/RtpPacketBuilder.java

@@ -0,0 +1,29 @@
+package com.usky.cdi.service.sip.rtp;
+
+/**
+ * RTP 包组装
+ */
+public final class RtpPacketBuilder {
+
+    private RtpPacketBuilder() {
+    }
+
+    public static byte[] build(byte[] payload, int payloadType, int sequenceNumber,
+                               long timestamp, long ssrc, boolean marker) {
+        byte[] packet = new byte[12 + payload.length];
+        packet[0] = (byte) 0x80;
+        packet[1] = (byte) ((marker ? 0x80 : 0) | (payloadType & 0x7F));
+        packet[2] = (byte) (sequenceNumber >> 8);
+        packet[3] = (byte) (sequenceNumber & 0xFF);
+        packet[4] = (byte) (timestamp >> 24);
+        packet[5] = (byte) (timestamp >> 16);
+        packet[6] = (byte) (timestamp >> 8);
+        packet[7] = (byte) (timestamp);
+        packet[8] = (byte) (ssrc >> 24);
+        packet[9] = (byte) (ssrc >> 16);
+        packet[10] = (byte) (ssrc >> 8);
+        packet[11] = (byte) (ssrc);
+        System.arraycopy(payload, 0, packet, 12, payload.length);
+        return packet;
+    }
+}

+ 10 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/sdp/SdpUtils.java

@@ -164,6 +164,16 @@ public final class SdpUtils {
         return "active";
     }
 
+    /** SIP 服务端 200 OK 应答 SDP(简化版) */
+    public static String buildVideoAnswerSdp(String localIp, int localRtpPort,
+                                             int payloadType, String codec) {
+        SdpMediaInfo media = new SdpMediaInfo();
+        media.setPayloadType(payloadType);
+        media.setCodec(codec != null ? codec : (payloadType == 96 ? "PS" : "H264"));
+        media.setSsrc("0100000001");
+        return buildGb28181AnswerSdp(localIp, localRtpPort, media, "34020000001320000001", media.getSsrc());
+    }
+
     /** @deprecated 使用 {@link #buildGb28181AnswerSdp} */
     public static String buildGb28181InviteSdp(String localIp, int localRtpPort,
                                                 int payloadType, String deviceId, String ssrc) {

+ 39 - 0
service-cdi/service-cdi-biz/src/main/java/com/usky/cdi/service/sip/util/SipDigestAuth.java

@@ -0,0 +1,39 @@
+package com.usky.cdi.service.sip.util;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * SIP Digest 认证(GB28181 注册)
+ */
+public final class SipDigestAuth {
+
+    private SipDigestAuth() {
+    }
+
+    public static String computeResponse(String username, String password, String realm,
+                                           String method, String uri, String nonce,
+                                           String nc, String cnonce, String qop) {
+        String ha1 = md5Hex(username + ":" + realm + ":" + password);
+        String ha2 = md5Hex(method + ":" + uri);
+        if (qop != null && !qop.isEmpty()) {
+            return md5Hex(ha1 + ":" + nonce + ":" + nc + ":" + cnonce + ":" + qop + ":" + ha2);
+        }
+        return md5Hex(ha1 + ":" + nonce + ":" + ha2);
+    }
+
+    public static String md5Hex(String input) {
+        try {
+            MessageDigest md = MessageDigest.getInstance("MD5");
+            byte[] digest = md.digest(input.getBytes(StandardCharsets.UTF_8));
+            StringBuilder sb = new StringBuilder();
+            for (byte b : digest) {
+                sb.append(String.format("%02x", b));
+            }
+            return sb.toString();
+        } catch (NoSuchAlgorithmException e) {
+            throw new IllegalStateException("MD5 not available", e);
+        }
+    }
+}