凌霄的博客
Camera开发系列之六-使用mina框架实现视频推流
Camera开发系列之六-使用mina框架实现视频推流

https://blog-1252348761.cos.ap-chengdu.myqcloud.com/http/QQ%E5%9B%BE%E7%89%8720190415102530.png

章节

Camera开发系列之一-显示摄像头实时画面

Camera开发系列之二-相机预览数据回调

Camera开发系列之三-相机数据硬编码为h264

Camera开发系列之四-使用MediaMuxer封装编码后的音视频到mp4容器

Camera开发系列之五-使用MediaExtractor制作一个简易播放器

Camera开发系列之六-使用mina框架实现视频推流

Camera开发系列之七-使用GLSurfaceviw绘制Camera预览画面

mina是什么

MINA框架是对java的NIO包的一个封装,简化了NIO程序开发的难度,封装了很多底层的细节,让开发者把精力集中到业务逻辑上来。说的简单一点,它就是一个简单但功能齐全的网络应用框架,它的作者是现在大名鼎鼎的服务端Netty框架的作者。如果对mina框架不够了解,

请先看这篇文章:mina框架简单介绍

请先看这篇文章:mina框架简单介绍

请先看这篇文章:mina框架简单介绍

为什么要用mina框架

视频分为本地视频和视频流,本地视频即为已经下载好到本地的视频,常见格式为MP4,WMV,AVI等格式,视频流多见于直播中,其中常见格式为RTSP流媒体,RTMP流,m3u8 流媒体,MMS 流。

而这些直播协议底层无外乎都是通过socket实现的,想要了解它们的原理,可以自己使用socket实现推流,来抛砖引玉。但是使用socket实现不太现实,工作量很大,而mina是封装了socket的轻量级框架,用在这上面再合适不过了。

那么,怎么实现camera推流呢,让我们将问题拆分一下:

  1. 指定数据收发协议,客户端和服务端需要制定一套规则来进行沟通
  2. 服务端和客户端建立连接
  3. 服务端将视频数据编码,然后发送给客户端
  4. 客户端收到消息,将视频数据解码播放
  5. 断线重连问题

然后看一下最终实现的效果,因为是1080p的,而且网络带宽不太好,有一些延迟:

https://blog-1252348761.cos.ap-chengdu.myqcloud.com/http/c6c7bb5f3cf8cc2c7759bbe6ec4fb2f2.GIF

数据传输协议-TLV协议

数据传输协议使用的是TLV协议,即Type-Length-Value,它是一种简单实用的数据传输方案。Type指数据类型,Length指发送的数据长度,Value指数据本身。其中Value又可以再嵌套TLV,拓展性非常高。

可能就这样说数据传输协议有点抽象,后面会在代码中体现出来。数据类型定义出来辽,可以开始编写发送接收数据了。

先定义一个视频数据类:

public class VideoStreamModel {
    private int type; //视频格式
    private int width; //视频宽
    private int height; //视频高
    private long seq_no0; //帧数 下同
    private long seq_no1;
    private byte[] video; //视频数据
    //···省略get/set方法
}

有了数据类,服务端就可以通过mina的IoSession来发送视频消息啦。IoSession可以理解为客户端和服务端通信的桥梁。想要具体了解可以看之前的文章。

服务端发送数据

在解码h264数据的回调中,将数据通过IoSession发送给客户端:

public void onH264DataFrame(byte[] h264, int width, int height) {
                //硬编码之后的h264数据
                byte[] h264Data = Arrays.copyOf(h264, h264.length);
                VideoStreamModel model = new VideoStreamModel();
                mSeq_no0++;
                mSeq_no1++;
                model.setType(2);
                model.setWidth(width);
                model.setHeight(height);
                model.setSeq_no0(mSeq_no0);
                model.setSeq_no1(mSeq_no1);
                model.setVideo(h264Data);
                sendTo(model);
}
private IoSession session;
public String sendTo(VideoStreamModel streamModel) {
        if (null == streamModel){
            return "data is null!";
        }
        byte[] data = streamModel.getVideo();
        IoBuffer buffer = IoBuffer.allocate(8 + 24 + data.length);
        buffer.order(ByteOrder.LITTLE_ENDIAN);
        buffer.putInt(MESSAGE_ID_STRAME);
        buffer.putInt(24 + data.length);
        //下面的数据占用24个字节
        buffer.putInt(streamModel.getType());
        buffer.putInt(streamModel.getWidth());
        buffer.putInt(streamModel.getHeight());
        buffer.putUnsignedInt(streamModel.getSeq_no0());
        buffer.putUnsignedInt(streamModel.getSeq_no1());
        buffer.putInt(data.length);

        buffer.put(data);
        buffer.flip();
        session.write(buffer);
        Log.i(TAG,"buffer的limit:"+buffer.limit()+"  len: " + (24 + data.length)
        +"messageID: "+Constants.createType(Constants.MESSAGE_ID_STRAME));
        return null;
    }

上面的IoBuffer.allocate(8 + 24 + data.length)意思是开辟内存空间,空间大小为8 + 24 + data.length,这个值是怎么计算出来的呢?

可以看到,上面有一个常量MESSAGE_ID_STRAME,这个是我自定义的int类型常量,用于标识数据的类型,也就是TLV中的Type。由于是int类型,所以占用4个字节。

第二个放入的是24 + data.length,这个是数据的长度,即Length。同样是int类型,占用4个字节

最后是数据本身,即VideoStreamModel这个数据类再加上了视频数据的大小,占用的字节大小为:4x6 + data.length。

这样一个TLV数据发送协议就写好啦,是不是很简单呢。

https://blog-1252348761.cos.ap-chengdu.myqcloud.com/http/B5CA2DDFFBDCBB1C598BD3A3E776AA72.jpg

客户端接收数据

数据发送协议写好了,那客户端怎么收到这些数据并处理呢?
在客户端的IoHandlerAdapter的messageReceived方法中,可以对数据进行处理:

public void messageReceived(IoSession session, Object message) throws Exception {
            //super.messageReceived(session, message);
            IoBuffer buffer = (IoBuffer) message;
            buffer.order(ByteOrder.LITTLE_ENDIAN);
            int type = buffer.getInt();
            int messageID = (type & 0x3FF);
            int len = buffer.getInt();
            Log.i(TAG,"messageID:"+messageID+"  type: "+type);
            switch (messageID) {
                case MESSAGE_ID_STRAME:
                    if(buffer.remaining() < 4){
                        return;
                    }
                    int type1 = buffer.getInt();
                    if(buffer.remaining() < 24){
                        return;
                    }
                    int width = buffer.getInt();
                    int height = buffer.getInt();
                    long seq_no0 = buffer.getUnsignedInt();
                    long seq_no1 = buffer.getUnsignedInt();

                    int bufferSize = buffer.getInt(); //视频帧大小
                    if(buffer.remaining() < bufferSize) {
                        return;
                    }
                    // 做个最大判定,不能太大了!
                    if(bufferSize > 1024 * 1024) {
                        return;
                    }
                    byte[] h264Segment = new byte[bufferSize];
                    buffer.get(h264Segment);
                    H264Decoder.getInstance().handleH264(h264Segment);
                    break;
            }
        }

上面的代码其实就是将服务端发送过来的数据进行解析,然后播放。这里只需要注意IoBuffer的使用,视频如果无法播放,一般都是发送数据或者接收数据的时候对数据长度处理不当造成的。

粘包、拆包处理

你以为处理好上面的那些问题就可以正常使用了么?天真!熟悉TCP的同学可能知道TCP有一个滑动窗口的概念,那就能很好的理解数据的拆包粘包现象了,如果不知道也没关心,它大概就是将数据分开发送的一个东西。

https://blog-1252348761.cos.ap-chengdu.myqcloud.com/http/DD65D46B01975BB5ADC28940A9A8861F.jpg

为啥要将数据分开发送呢,比如传输一个10m大小的文件,不分包一次性发送这么大量的数据,首先TCP/IP协议中,TCP的下一层IP层就不同意,因为IP数据包的负载是有限的,大概为1480个字节,其次是不分包的方式是不合理的,会存在极大的资源浪费。

分包之后,伴随的问题就出来了:
https://blog-1252348761.cos.ap-chengdu.myqcloud.com/http/%E7%B2%98%E5%8C%85_%E6%8B%86%E5%8C%85.png

socket的缓冲区大小是固定的,如果数据包比较大,只会把这个包前面部分的数据发送出去,毕竟咱们都是绅士,不能强人所难,硬要人家把这个数据包都传过去吧,可以看到第一个数据包发生了拆包。

紧接着的是第二个socket来了,它负责把第一个数据包的后面部分带上,并且还有空闲的空间,勤俭节约是中华名族的传统美德,既然你还有空间,那就把后面数据包的数据也带上一些吧,这时候两个数据包的数据都在一个socket中,也就是发生了粘包。

知道了粘包拆包是怎么产生的,解决方法也就很简单了,只需要事先知道数据的起始位置,以及每个数据包的大小,也就是TLV中的length,就可以组装value了。

粘包,拆包都是数据发送端自动完成的,所以只需要在数据接收端进行处理,下面是自定义mina解码器解决粘包、拆包问题,继承CumulativeProtocolDecoder这个类,它是累积性的协议解码器,也就是说只要有数据发送过来,这个类就会去读取数据,然后累积到内部的 IoBuffer 缓冲区,但是具体的拆包交由子类的 doDecode()方法完成:

public final class TLVDecoder extends CumulativeProtocolDecoder {
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        if (in.remaining() >= 8) { // 前8字节是包头+长度 两个都是int 所以是8
            // 标记当前position的快照标记mark,以便后继的reset操作能恢复position位置
            in.mark();

            in.order(ByteOrder.LITTLE_ENDIAN);
            @SuppressWarnings("unused")
            int type = in.getInt();
            int len = in.getInt();

            // 注意上面的get操作会导致下面的remaining()值发生变化
            if (in.remaining() < len) {
                // 如果消息内容不够,则重置恢复position位置到操作前,进入下一轮, 接收新数据,以拼凑成完整数据
                in.reset();
                return false;
            } else {
                // 消息内容足够
                in.reset(); // 重置恢复position位置到操作前
                int totalLen = (int) (8 + len); // 总长 = 包头+包体

                byte[] packArr = new byte[totalLen];
                in.get(packArr, 0, totalLen);

                IoBuffer buffer = IoBuffer.allocate(totalLen);
                buffer.put(packArr);
                buffer.flip();
                out.write(buffer);
                buffer.free();

                if (in.remaining() > 0) { // 如果读取一个完整包内容后还粘了包,就让父类再调用一次,进行下一次解析
                    return true;
                }
            }
        }
        return false; // 处理成功,让父类进行接收下个包
    }
}

自定义编码器,肥肠简单,这里继承的ProtocolEncoderAdapter是将java中的对象转换为字节流发送,是mina框架封装的编码器:

public final class TLVEncoder extends ProtocolEncoderAdapter {
    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        out.write(message);
        out.flush();
    }
}

编解码器定义好了,在编解码器工厂中进行初始化,实现了ProtocolCodecFactory,mina中的编解码工厂都需要实现这个接口,比如我们之前用的TextLineCodecFactory

public final class TLVCodecFactory implements ProtocolCodecFactory {
    private TLVDecoder decoder;
    private TLVEncoder encoder;

    public TLVCodecFactory() {
        decoder = new TLVDecoder();
        encoder = new TLVEncoder();
    }

    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return encoder;
    }

    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return decoder;
    }
}

初始化mina的时候传这个编解码工厂就行了,

TLVCodecFactory codecFactory = new TLVCodecFactory();
ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(codecFactory);
streamConnection = new NioSocketConnector();
streamConnection.getFilterChain().addLast("tlv", codecFilter);

断线重连

网络应用难免会遇到网络不稳定的时候,这时候连接有可能会断开,断开之后需要做自动断线重连,给用户一个较好的体验,mina有一个IoServiceListener接口,用于监听当前会话的状态,我们实现这个接口,就可以在会话异常销毁的时候,进行重连:

private final class StreamAutoReconnectHandler implements IoServiceListener {

        @Override
        public void serviceActivated(IoService ioService) throws Exception {

        }

        @Override
        public void serviceIdle(IoService ioService, IdleStatus idleStatus) throws Exception {

        }

        @Override
        public void serviceDeactivated(IoService ioService) throws Exception {

        }

        @Override
        public void sessionCreated(IoSession ioSession) throws Exception {

        }

        @Override
        public void sessionClosed(IoSession ioSession) throws Exception {

        }

        @Override
        public void sessionDestroyed(IoSession ioSession) throws Exception {
            // 不是主动断开连接的,需要重连
            while(m_state == State.Connected || m_state == State.ReConnecting) {
                try {
                    ConnectFuture future = streamConnection.connect();
                    future.awaitUninterruptibly();// 等待连接创建成功
                    streamSession = future.getSession();// 获取会话
                    if(m_state != State.Connected && m_state != State.ReConnecting) {
                        Log.i(TAG,"不需要断线重连");
                        streamSession.closeNow();
                        break;
                    }
                    if (streamSession != null) {
                        m_state = State.Connected;
                        break;
                    }
                    Log.i(TAG,"断线重连");
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }

完整代码已上传至github:camera开发系列

发表评论

textsms
account_circle
email

Camera开发系列之六-使用mina框架实现视频推流
章节 Camera开发系列之一-显示摄像头实时画面 Camera开发系列之二-相机预览数据回调 Camera开发系列之三-相机数据硬编码为h264 Camera开发系列之四-使用MediaMuxer封装编码后的音视频…
扫描二维码继续阅读
2019-04-15