Rtsp流媒体服务器小结

更新时间:2023-11-14 03:35:01 阅读量: 教育文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

Darwin Streaming Server流媒体服务器小结

Darwin Streaming Server(简称DSS)是QuickTime Streaming Server开放式源代码的版本,同时支持FreeBSD、Linux、Solaris、Windows NT和Windows 2000等多个操作系统,是当前所有同类产品中支持平台最多的一个。

DSS源代码完全采用标准C++语言写成,每个C++类都对应着一对和类同名的.h/.cpp文件。

整个服务器包括多个子系统,分别存放在独立的工程内,其中,最为重要的是基础功能类库(CommonUtilitiesLib)和流化服务器(StreamingServer)两个工程,前者是整个系统的通用代码工具箱,包括了线程管理、数据结构、网络和文本分析等多个功能模块。后者包含了DSS对多个国际标准的实现,是整个服务器的主工程。DSS实现了四种IETF制定的国际标准,分别是:实时流传输协议RTSP(Real-time Streaming Protocol, RFC 2326)、实时传输协议(RTP Real-time Transfer Protocol,RFC 1889)、实时传输控制协议RTCP(Real-time Transport Control Protocol,RFC 1889)、会话描述协议SDP(Session Description Protocol,RFC 2327)。

核心服务器通过创建四种类型的线程来完成自己的工作,具体如下: 1.主线程(main thread):这个线程负责检查服务器是否需要关闭,记录状态信息,或者打印统计信息。 2.空闲任务线程(Idle Task thread):空闲任务线程管理一个周期性的任务队列,该任务队列有两种类型:超时任务和套接口任务。 3.事件线程(Event thread):事件线程负责侦听套接口事件,比如收到RTSP请求和RTP数据包,然后把事件传递给任务线程。

4.一个或多个任务线程(Task threads):任务线程从事件线程中接收RTSP和RTP请求,然后把请求传递到恰当的服务器模块进行处理,把数据包发送给客户端。

一.基础功能类库(Common Utilities)

1.Darwin Streaming Server支持包括Windows,Linux以及Solaris在内的多种操作系统平台。我们知道,Windows和Unix(或Unix-like)操作系统之间无论从内核还是编程接口上都有着本质的区别,即使是Linux和Solaris,在编程接口上也大为不同。为此,DSS开发了多个用于处理时间、临界区、信号量、事件、互斥量和线程等操作系统相关的类,这些类为上层提供了统一的使用接口,但在内部却需要针对不同的操作系统采用不同的方法实现。

OSCond 状态变量的基本功能和操作,OSMutex 互斥量的基本功能和操作, OSThread 线程类,OSFileSource 简单文件类,OSQueue 队列类, OSHashTable 哈希表类,OSHeap 堆类,OSRef 参考引用类。

2.Task类,用来处理事件通知机制。

在Task.h/cpp文件中,定义了三个主要的类,分别是:任务线程池类(TaskThreadPool Class)、任务线程类(TaskThread Class)以及任务类(Task Class)。 每个Task对象有两个主要的方法:Signal和Run。当服务器希望发送一个事件给某个Task对象时,就会调用Signal()方法;而Run()方法是在Task对象获得处理该事件的时间片后运行的,服务器中的大部分工作都是在不同Task对象的Run()函数中进行的。每个Task对象的目标就是利用很小的且不会阻塞的时间片完成服务器指定某个工作。

任务线程类(TaskThread)是OSThread类的一个子类,代表专门用于运行任务类的一个线程。在每个任务线程对象内部都有一个OSQueue_Blocking类型的任务队列,存储该线程需要执行的任务。服务器调用一个任务的Signal函数,实际上就是将该任务加入到某个任务线程类的任务队列中去。

另外,为了统一管理这些任务线程,DSS还开发了任务线程池类,该类负责生成、删除以及维护内部的任务线程列表。

这种由事件去触发任务的概念已经被集成到了DSS的各个子系统中。例如,在DSS中经常将一个Task对象和一个Socket对象关联在一起,当Socket对象收到事件(通过select()函数),相对应的Task对象就会被传信(通过Signal()函数);而包含着处理代码的Run()函数就将在某个任务线程中运行。 3.Socket类

DSS中的Socket类一般都采用异步模式的(即非阻塞的),而且能够向对应的Task对象传信(Signal),Socket类中具有代表性的类是:EventContext、EventThread、Socket、UDPSocket、TCPSocket以及TCPListenerSocket等等。

在eventcontext.h/.cpp文件中,定义了两个类:EventContext类和EventThread类。 Event Context提供了检测Unix式的文件描述符(Socket就是一种文件描述符)产生的事件(通常是EV_RE 或 EV_WR)的能力,同时还可以传信指定的任务。EventThread类是OSThread类的子类,它本身很简单,只是重载了OSThread的纯虚函数Entry(),用以监控所有的Socket端口是否有数据到来。

EventContext对象负责维护指定的描述符,其主要函数包括InitNonBlocking、CleanUp和RequestEvent等。其中InitNonBlocking函数调用Socket API ioctlsocket将用户指定的描述符设置为异步,CleanUp函数用于关闭该描述符;另外,用户通过RequestEvent函数申请对该描述符中某些事件的监听

Socket Class、UDPSocket Class和TCPSocketClass三个类都是EventContext的子类,它们封装了TCP和UDP的部分实现,同时扩展了EventContext中的事件,但都没有改变其运行机制。TCPListenerSocket用于监听TCP端口,当一个新连接请求到达后,该类将赋予这个新连接一个Socket对象和一个Task对象的配对。

二.服务器模块:

处理网络和协议。主要有3个子系统:RTSP子系统,RTP子系统以及公共服务子系统。

服务器内核:这个子系统中的类都有一个QTSS前缀。 QTSSServer负责处理服务器的启动和关闭。

QTSSServerInterface负责保存服务器全局变量,以及收集服务器的各种统计信息。

QTSSPrefs是存储服务器 偏好设定的地方。

QTSSModule,QTSSModuleInterface,和QTSSCallbacks类的唯一目的就是支持QTSS的模块API。 (1) RTSP子系统

负责解析和处理RTSP请求,以及实现QTSS模块API的RTSP部分。其中的几个类直接对应QTSS API的一些元素(例如,RTSPRequestInterface类就是对应QTSS_RTSPRequestObject对象)。每一个RTSP/TCP连接都对应一个RTSP的session.主要的类有RTSPSession,RTSPRequest,RTSPResponseStream和RTSPRequestStream. (2) RTP子系统

负责媒体数据包的发送,根据RTCP的反馈进行服务质量控制。主要的类有RTPSession,RTPStream和RTCPTask. (3)公共服务子系统

负责服务器的启动/关闭,初始化参数设置以及为Module机制,跨平台的多线程和事件机制等提供支持。

DSS提供了一种称为Module的二次开发接口。使用这个开发接口,我们可以自由扩张服务器的功能。

DSS定义了一个TCPListenerSocket类的子类RTSPListenerSocket,用于监听RTSP连接请求。RTSPListenerSocket类做的唯一一件事就是重载了GetSessionTask函数,当客户的连接请求到达后,它创建了一个Socket对象和RTSPSession对象的配对。RTSPSession对象是Task类的子类,是专门用于处理RTSP请求的任务类。

当client端发出Play请求时,server端的RTSPListenerSocket监听到这个请求,Run创建一个RTSPSession,这个RTSPSession被加入到任务队列中,当时间片到达时,TaskThread线程就会调用RTSPSession对象的函数,在Run函数中,维护一个RTSPSession状态机,对客户的RTSP请求做出不同的处理。 请求分析完成后,RTSPSession进入请求处理状态(kProcessingRequest),DSS会调用注册了“请求处理任务”(QTSS_RTSTRequest_Role)的module,而

QTSSFileModule就是这样一个Module。QTSSFileModule定义了一个分发函数QTSSFileModuleDispatch,它根据传入的任务类别和任务参数调用相应的函数。此时传入的任务是QTSS_RTSTRequest_Role,相应的处理函数是ProcessRTSPRequest,该函数根据传入的RTSP Method调用相应的处理函数,此时传入的method是play,所以调用函数DoPlay。

IdleTaskThread,它的Entry 中不停的超时等待,查看它队列中的task是否到期,是则调用该task->signal.

IdleTask的SetIdleTimer()里面调用IdleTaskThread->SetIdleTimer()把该task加入它的fIdleHeap.

程序的数据流程为四部分,就是: 1,task。

每个Task对象有两个主要的方法:Signal和Run。当服务器希望发送一个事件给某个Task对象时,就会调用Signal()方法;而Run()方法是在Task对象获得处理该事件的时间片后运行的,应用可以通过继承Task 并重写Run()方法实现自己的任务。

2,EventContext事件的触发者,当事件发生时,调用Task::signal().

3,TaskThread,任务的驱动线程,对一个或者多个Task进行调度,通过调用 Task::run() 处理事件.

4,EventThread,EventContext的驱动线程,可以处理多个EventContext, 发生事件时调用EventContext::process_event(),后者将调用Task::Signal()

流程:

1,Client或者 Task的子类向Event Context注册事件。 2,EventContext将事件放入EventThread的Pool内。 3,EventThread 调用select 等待多个事件中任一个触发。

4,事件触发以后,EventThread调用 EventContext::ProcessEvent()。 5,调用 Task::signal()。

6,Task::signal()将task放入TaskThread的队列。 7,TaskThread调度相应的Task, 执行其Run()方法。它的具体实现在各个Module中定义。

DSS的基本功能单元称为模块(module),每一个模块都是处理某一类事件的功能集合。例如QTSSAcessModule就是鉴权授权事件的主要处理模块,类似于桌面窗口的事件处理中的事件,DSS服务器核心处理RTSP请求,定义了若干角色(role),一个角色就是一项任务,一个模块可以注册若干个角色,表示这个模块可以处理这些注册的任务。比如一个模块注册了QTSS_RSTPPreProcess_Role,则可以调用这个Module来预处理RSTP请求。根据不同的任务,服务器会传递不同的对象给模块,这些对象中包含有为完成这个任务所包含的一系列数据以及为向服务器传递信息而存在的数据结构。

Darwin streaming server的媒体存储格式hint track格式以及被ISO,ISMA等标准组织采纳并成为正式标准。Hint track是Darwin streaming server的扩展,通过在媒体文件中增加hint track 轨道,存放了一些预先生成好的媒体描述信息(SDP格式)和媒体数据打包的索引信息。通过ISMA传输协议进行传输时,可以直接根据数据打包索引信息来打包发送,不需要对信息进行重复的分析格式组装数据包的处理过程。

普通文件可以通过一些工具添加hint track属性。开源的工具有mpeg4ip,mp4info.

RTSP 数据处理

在同时开启多个窗口播放同一个sdp文件时,都是使用同一个ReflectorSession对象,FindOrCreateSession也不会调用SetupReflectorSession函数,即ReflectorStream、ReflectorSocket等对象也不会被再次创建。注意对于一个新的播放链接来说,RTSPSession、RTPSession都是新创建的对象,所以需要重新创建RTPSessionOutput对象。

在QTSSReflectorModule.cpp的DoSetup()函数中,由DoSessionSetup进入 FindOrCtreateSession(),建立出ReflectorSession,里面创建ReflectorStream,绑定它对于的ReflectorSocket.

ReflectorSession* FindOrCreateSession {

ReflectorSession* theSession

theSession = NEW ReflectorSession(inPath);

SourceInfo 通过读取sdp文件,对应ffmpeg输出的rtp流。 theSession->SetupReflectorSession(theInfo, inParams, theSetupFlag,sOneSSRCPerStream, sTimeoutSSRCSecs);

该函数里面:

fStreamArray = NEW ReflectorStream*[fSourceInfo->GetNumStreams()]; for (UInt32 x = 0; x GetNumStreams(); x++)

fStreamArray[x] =

NEW ReflectorStream(fSourceInfo->GetStreamInfo(x));

fStreamArray[x]->

BindSockets(inParams,inFlags, filterState, filterTimeout);

}

doSetup()中建立outPut和rtpSession,reflectorSession的关系。

RTPSessionOutput* theNewOutput = NEW RTPSessionOutput(inParams->inClientSession, theSession, sServerPrefs, sStreamCookieAttr ); theSession类型为ReflectorSession.

theSession->AddOutput(theNewOutput,true);

(void)QTSS_SetValue(inParams->inClientSession, sOutputAttr, 0, &theNewOutput, sizeof(theNewOutput)); theErr = QTSS_AddRTPStream(inParams->inClientSession, inParams->inRTSPRequest, &newStream, 0); 该函数中

*outStream = NEW RTPStream(theSSRC, this);

QTSS_ErrortheErr = (*outStream)->Setup(request, inFlags); 建立该rtpStream 和client的socket联系。

// Place the stream cookie in this stream for future reference

void* theStreamCookie =

theSession->GetStreamCookie(theTrackID); theStreamCookie 为reflectorStream.

theErr = QTSS_SetValue(newStream, sStreamCookieAttr, 0,

&theStreamCookie, sizeof(theStreamCookie));

RTPStream类有的属性有:

UDPSocketPair* fSockets;

RTPSessionInterface* fSession;

在ReflectorStream::PushPacket中,把packet拷贝到 reflectorSocket,调用ProcessPacket

// Find the appropriate ReflectorSender for this packet. ReflectorSender* theSender = (ReflectorSender*)this->

GetDemuxer()->GetTask(theRemoteAddr, 0);

theSender->fPacketQueue.EnQueue(&thePacket->fQueueElem);

在ReflectorStream.cpp的ReflectorSocket::Run()中 调用GetIncomingData()和ReflectPackets()收发包。

ReflectorSender->由reflectStream找到 ReflectorOutput->

从他对应的rtpSession找到rtpStream,判断该rtpStream 对应的reflectorStream 是否就是调用方的这个fstream.是则调用该rtpStream的write函数发送数据包。

播放一下,马上出错原因是,在请求sdp文件失败返回后,调用了fileModule的doDescribe,返回出错,再调用QTSS_Teardown,销毁rtpSession.

Signal(killEvent),在它的~RTPSessionInterface()中调用RTSPSession的DecrementObjectHolderCount()的signal(killEvent),删除rtspSession.

Udpsocket 句柄释放在eventSocket的虚构函数中close.

2.点播模块分析

1,RTPSession:Run()有

(void)fModule->CallDispatch(QTSS_RTPSendPackets_Role, &theParams); 它调用QTSSFileModuleDispatch 的SendPackets(), 2,RTSPSession:Run()有 Case kProcessingRequest:

(void)theModule->CallDispatch(QTSS_RTSPRequest_Role, &fRoleParams);调

用ProcessRTSPRequest().

QTSServer.cpp中定义好了sCallbacks.addr[]=(QTSS_Callback)QTSS_API

FileSession: 里面有

QTRTPFilefFile;

SDPSourceInfofSDPSource;

FileSession 中 fFile 类型为QTRTPFile,

在DoDescribe函数中,调用CreateQTRTPFile()函数,根据inPath,

*outFile= New FileSession(),并(*outFile)->fFile.Initialize(inPath)

FileSession类中,主要结构有QTRTPFilefFile; fAdjustedPlayTime; QTSS_PacketStructfPacketStruct;fStartTime,fStopTime;fLastRTPTime;

QTRTPFile.h中定义了 QTFile * fFile,

theSDPData.Ptr = theFile->fFile.GetSDPFile(&sdpLen);

QTRTPFile::ErrorCodeqtfileErr = theFile->fFile.AddTrack(theTrackID, true);//都是调用了qtRTPFile里面解析数据的函数。

在doDescribe()的最后,

theFile->fSDPSource.Parse(theSDPData.Ptr, theSDPData.Len);

SourceInfo::StreamInfo* theStreamInfo = theFile->fSDPSource.GetStreamInfo(x);

rtpSession的功能:一1对应于rtspSession,里面有2个rtpStream,

在killEvent中对所有注册了QTSS_ClientSessionClosing_Role角色的模块调用(void)theModule->CallDispatch(QTSS_ClientSessionClosing_Role, &theParams);

里面最重要的fModule在rtspSession的run中给它赋值,让他对应上fRTPSession.即fRTPSession->SetPacketSendingModule(theModule);

IdleoutTask,

IdleTask::Initialize()在StartServer()函数中调用,它创建出sIdleThread. ReflectSocket继承自IdleoutTask

TimeoutTask,初始化时传入task对象指针,它会加入到TimeoutTaskThread对

象的队列,该静态对象task在StartServer()函数中初始化,TimeoutTask::Initialize().

TimeoutTask的构造函数会把它加入到TimeoutTaskThread的队列中,该task的run函数会调用它队列中的所有元素,判断是否超时,调用

theTimeoutTask->fTask->Signal(Task::kTimeoutEvent);

RTPSessionInterface类中有一个成员变量TimeoutTaskfTimeoutTask;

RTSPSessionInterface类中也有TimeoutTaskfTimeoutTask;

四.实时转码和直播模块的关系

在trunk/APIModules/QTSSReflectorModule/模块下添加了

ReflecotrFfmpeg.cpp,ReflectorFfmpeg.h两个独立文件。用来处理darwin调用ffmpeg实现转码的功能。由ffmpeg模拟了直播模式的产生数据流功能,生成sdp描述文档和rtp的音频流和视频流。

具体替换了QTSSReflectorModule.cpp中的

1,DoSessionSetup()为DoSessionSetupWithFfmpeg()

在FindOrCreateSession()中创建出ReflectorFfmpeg对象。 ReflectorFfmpeg* theFfmpeg=NEW ReflectorFfmpeg();

if((!theFfmpeg)||( theFfmpeg->SetupFfmpeg(inPath,inParams)!=0)) 开始实时转码。

本文来源:https://www.bwwdw.com/article/484v.html

Top