基于内存缓冲区的流媒体数据缓存

更新时间:2024-04-03 17:50:01 阅读量: 综合文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

基于内存缓冲区的流媒体数据缓存

姓名:常双举 学号:7120310418 班级:1240009

2014年7月18日

基于内存缓冲区的流媒体数据缓存

常双举 720310418 124009

摘要:在有了一个对流媒体数据的深刻理解的情况下,剖析缓冲区建立的多种困难和情况,

并给出解决问题的想法和算法,使构建出的缓冲区可以接收流媒体数据。

一、题目描述

流媒体是指以流的方式在网络中传输音频、视频和多媒体文件的形式,流式传输方式是将视频和音频等多媒体文件经过特殊的压缩方式分成一个个的压缩包,由服务器向用户计算机连续、实时传送;在采用流式传输方式的系统中,用户不必像非流式播放那样等到整个文件下载完毕后才能看到当中的内容,而是只需要经过几秒钟或几十秒的启动延时即可在用户计算机上利用相应的播放器对压缩的视频或音频等流式媒体文件进行播放,剩余的部分将继续进行下载,直至播放完毕;从这里看出流媒体数据是一边下载一边输出的;而且这个过程的一系列相关的包称为“流”,而流式传输的实现需要缓存,因为Internet以包传输为基础进行断续的异步传输;

这里就可以知道整个实验的大致流程为服务器以数据包的形式像用户传送数据,即用户接收到的一个个的数据包,而又因为流式传输的需要,用户需要建立缓存区,即题目中的内存缓冲区,用户的缓冲区接收到服务器发送的数据包,将其写入缓冲区处理好,整理出正确的数据输出给用户;整个过程是实时的,即缓冲区是一边接收数据包,一边输出给用户的。 二、问题分析

清楚了整个实验的流程,有几个问题是必须要清楚的;一是服务器发送给缓冲区的数据包的顺序问题,是服务器顺序切割文件产生的顺序数据包还是多线程下载产生的乱序的数据包?二是服务器在传送数据包的情况下,有没有可能因为一些因素导致某些数据包发送失败,从而产生数据包丢失的问题?三是假如服务器接收的是多用户请求,那么传送的数据包对于用户来说是否是重复的呢?同时,由于用户可以同时下载多个文件,多个节目,假如某一用户向服务器发送多节目下载请求,缓冲区接收的数据包岂不是具有不同节目ID的数据包?

除去以上的数据包的属性问题,还有缓冲区大小问题和用户需求问题; 缓冲区大小问题是指缓冲区作为一个数据的中转站,不可能具有特别大的一块内存供其使用,这不符合缓冲区的缓冲性质,因此需要对缓冲区设定大小限制,例如,要求缓冲区占用必须在512K以内;

用户需求问题是指用户在使用数据时,要求从缓冲区中拿到的数据可以显示出一帧的画面或者一段语音,否则对于用户来说,这段数据是不能够使用的,因此需要设定一个输出要求,即对缓冲区可输出数据量的最低输出要求,例如设定缓冲区中可用数据大于1k时,缓冲区方可输出数据。

针对以上这些问题,我们需要有一个大概解决这些问题的思路;首先针对数据包的属性问题,可以设计一个基本的缓冲区,缓冲区可以接收顺序单线程、无丢失、无重复和单节目ID的数据包,然后再根据接收到数据包的属性复杂性,依次为缓冲区添加功能,从而达到处理数据包的目的,但是在沿着这个思路思考的时候,发现有几个问题是不能解决的,首先多用户下载的重复包问题是不能进入缓冲区的,还有多节目ID的数据包是不可能依靠一个缓冲区来处理的,另外缓冲区的内存限制和用户需求都需要辅助函数来解决,所以,解决以上所有问题一个简单的缓冲区

类是不够的,我们需要设计一个功能更加复杂的节目类! 三、解决方案

依据以上对问题的发现和探讨,我们大体的思路基本确定了,即设计一个功能完全的封装的节目类来接收服务器发送的多属性数据包,并输出数据供用户使用,输出的数据存放在目的文件中,以表示用户使用过的数据; 1、缓冲区设计

数据结构:结构体b,结构体中包含offset-偏移量、judge-判断是否该存储体为

输出源和arra-vector动态数组模板类以及用来指向下一个结构体的next结构体指针;

缓冲区说明:动态链表的连续动态存储体,即可以申请指针相连的存储结构体,

存储结构体中有vector数组动态存储;

框图表示: int offset; int offset; int offset;

int judge; int judge; int judge; next next vector vector vector arra; arra; arra; 数据区 数据区 数据区 2、乱序数据包处理

算法描述:

乱序到来的数据包可以分为以下几种情况;

1、 数据包写在某个存储体的末尾,要求存储体末尾元素与数据包首元

素相接,判断条件offset+arra.length=indataoffset;在写完后,要查验在接收这个数据包后,是否下个存储体可以连接在当前存储体的末尾,判断条件offset+arra.length=next->offset;连接后,要注意调整指针,循环查验直到条件不满足;

2、 数据包写在某个存储体的开始,要求数据包的末尾元素与存储体首

元素相接,判断条件indataoffset+baglength=offset;

3、 数据包写在第一个存储体之前或者写在某两个存储体之间(即某个

存储体之前),要求某个存储体的首位元素的偏移量大于与数据包末尾元素相接的元素的偏移量,判断条件offset>indataoffset+baglength;申请新的存储体接收这个数据包,申请后注意调整指针;

4、 数据包写在整个当前缓冲区的后面,要求与当前缓冲区最后一个存

储体的末尾元素相接的元素的偏移量小于数据包首位元素的偏移量,判断条件offset+arra.length

算法框图: 开始

offset+arra.length=indataoffset? 是 否

数据包写在当前存储体末尾

offset+arra.length=next->offset? 否

是 相邻存储体连接为一个存储体 next=NULL? 否 是 indataoffset+baglength=offset? 否 是 数据包写在存储体开始 offset>indataoffset+baglength? 是 否 申请新的存储体,第一个存储体或者两个存储体之间,数据包写在新的存储体

offset>indataoffset+

baglength?

是 否

申请新的存储体,

缓冲区的最后,数

据包写在新的存储

更新存储体指针 否 当前存储体指针

helpone=NULL?

结束

3、丢失数据包和缓冲区大小限制处理 算法描述:

丢失的数据包造成缓冲区中的存储体永远不能连接成一个存储体,故

导致remaindata函数返回的缓冲区可用数据量仅为缓冲区中一部分的数据量,而如果长期等待丢失的数据包,就会造成节目后部分数据堆积,从而导致缓冲区达到限制大小512K;即512k为缓冲区的最大等待“时间”;

需要解决两个问题:

1、 缓冲区大小限制问题;解决方法:每个数据包在接入缓冲区前都要利

用computer函数计算,假如数据包进入缓冲区是否会造成缓冲区溢出,如果会造成缓冲区溢出,就要调用forceout强制输出函数,重复这个过程直到数据包可以“顺利”进入缓冲区;

2、 如果数据是假丢失,即多线程,即数据包经过很长的时间才到来,此

时,缓冲区已经强制输出了这个数据包之后的数据;解决方法:记住输出到目的文件中的第一个数据的偏移量和当前目的文件中最会一个数据的偏移量,分别为original和endnumber,在数据包进入缓冲区之前,要判断数据包的indataoffset是否在original和endnumber

之间,如果在之间,就丢弃这个数据包,因为这个数据包已经是无效的。

算法框图:

开始 baglength>512K? 否 original1? 否 force函数强制输出缓冲区数据且更新original、endnumber值 结束 4、重复数据包处理 算法描述:

重复的数据包对于单用户来说,是无效的,而如果多用户就要发给不

同的文件,而对于其中的每个文件来说,重复的文件也是无效的,所以,决定对于重复的数据包只做丢弃处理,原理上是没有错误的;

解决方法:设置vector barrier数组,用来记录所有已经进入缓冲区的数

据包,而记录的元素为数据包的特征值indataoffset+baglength; 每个数 据包在进入缓冲区前都要调用fence“栅栏”函数进行特征值比较,如果 比较成功,则拦截成功,否则barrier数组记录这个数据包的特征值。

算法框图: 开始

计算数据包特征值:

offset=indataoffset+baglenth;

为数据包最后一个数据的偏

移量 fence栅栏函数比较特征值

return=1?

是 否 数据包进入缓冲区

结束

5、多节目ID数据包处理 算法描述:

即每个数据包都带有节目信息ID;节目类需要进行三个操作,一是

数据包未传送完毕,节目类接收数据包;二是数据包传送完毕,节目类进行缓冲区的清空;三是节目类内存空间的释放。

1、 节目类接收数据包:节目类需要判断此数据包的ID信息是否和当

前类对象的ID信息符合,符合则写在当前类对象所拥有的缓冲区

中,否则查看当前类的指针私有变量是否为NULL,如果非空,就把数据包传送给下一个类对象进行比较;如果为NULL值,就要启动一个新类,新类向缓冲池申请新的缓冲区,如果申请失败,则数据包存储失败,如果申请成功,就将数据包写入新类的缓冲区; 2、 数据包传送完毕:传送完毕时,节目类再次接收到最后一个数据包,

但是decision变为1,即通知节目类数据包传送完毕,当前节目类首先调用clearremainer函数输出缓冲区剩余数据,如果其指针私有变量next为非空,则传递decision为1这个信息,如果为空,则结束;

3、 节目类内存空间的释放:由于设计中的类的应用是用指针加new实

现的,故需要用delete来调用类的析构函数,而delete只能释放类型定义变量,不能释放类内部new出的内存,也不能关闭文件,故需要在析构函数~program中关闭文件、释放当前缓冲区的内存;另如果指针私有变量next为非空,则delete下一个类,启动下一个类的析构函数,如果为空,则结束。

算法框图:

开始

bagnumber=1?

new初节目类original; origianl.ID=databag.ID

初节目类original调用

printdata函数接收

databag

decision=1? 当前节目类调用 clearremainer函数输出缓冲区剩余数据; next=NULL?

当前类 .ID=databag.ID? next=NULL? new新类; 新类调用printdata函数接收数据包 当前类调用printdata函数接收数据包 结束 6、用户需求处理 算法描述:

用户需求问题在整个数据传送过程中,都要加以处理的,即如果缓冲区

进行数据输出,那么缓冲区一次性的数据数据输出量是要满足用户要求的,即一次性的输出量要大于1k;而在整个实验中,有三个位置,缓冲区是进行数据输出的,一是在顺序输出printdata函数中,缓冲区是边接收边输出的,每次输出的一定是连续的数据,且一次性的输出量大于1K;二是在forceout强制输出函数中,由于forceout函数的主要目的并不是让用户使用当前的数据,而是为了容纳当前数据包,因此,forceout函数是优先输出任意大小连续数据块,如果不能找到连续的数据块,那么就输出次连续的数据块;三是在clearremainer输出剩余数据函数中,因为没有了数据输入,因此无论缓冲区中有多少数据,大于1K或者小于1K,无论剩余数据是否连续,都是要一

次性输出。

注:连续数据是指此数据块的首元素的偏移量offset=endnumber。

顺序输出算法框图:

开始 根据endnumber 调用remaindata函数 返回缓冲区可用的连续数据量 remainer>1K? 是 输出useddata-用户使用量 数据量 清除缓冲区useddata的数据 更新outdataoffset和endnumber 结束 clearremainer算法框图:

开始

将所有数据聚集在缓冲区buffer

的第一个存储体中

输出并清除第一个存储体的数据

结束

7、总算法框图

开始

数据包到来

多节目处理

decision=1?

数据包传送结束?

clearremainer函数

丢失和缓冲区大小限制

处理

重复数据包处理

乱序和多线程处理

顺序输出函数

delete类

结束 四、实验代码

#include #include #include #include #include #include #include using namespace std;

const int maxbaglength=1500; struct databag{ int length;

int indataoffset; int ID;

char data[maxbaglength];};//数据包,备注:节目类接收的数据包是具有乱序、丢失、重复和多节目ID的属性的

struct b{

vector arra; int offset;

b* next;//指向下一个存储体

int judge;};//judge是一个判断该存储体是否被选中作为数据输出的源。为1,则被选中,每次输出完毕后,清除标志

const int programnumber=6;//存储池池位

b* bufferpond[programnumber];//构建缓冲区池,用来为每个节目分配缓冲区

class program{ public:

program();

void printdata(databag* bag); void clearremainer();

int computer(int baglength); int forceout(int baglength); int fence(int offset);

void receivedata(int indataoffset,int length,char* databag); int remaindata(int decision,char* &poutdata); void cleardata(int useddata); ~program(); private:

FILE* targetfilepoint;

b* buffer;

vector originalone; vector originaltwo;

int endnumber;//用来阻挡已经不需要的数据包

vector barrier;//栅栏函数处理重复数据包 int outdataoffset;//目的文件中的数据量 int ID;

program* next;};

program::program(int bagID) {

char targetfilename[500];

for(int i=0;i

if(bufferpond[i]==NULL) {

bufferpond[i]=new b; buffer=bufferpond[i]; buffer->judge=0; buffer->next=NULL; buffer->offset=0;

cout<<\存储区分配成功!\

cout<<\输入%d节目的目的文件名:\ cin>>targetfilename;

targetfilepoint=fopen(targetfilename,\ if(targetfilepoint==NULL) {

cout<<\目的文件创建失败!\ return;

}//初始化目的文件指针 break; }

if((i==programnumber-1)) {

cout<<\存储池已满!\ cout<<\新类创建失败!\ return; }

}//存储池为该节目类分配缓冲区 endnumber=0;

outdataoffset=NULL; ID=bagID;

next=NULL;

}//初始化类

void program::printdata(databag* bag,bool decision) {

if(decision==1) {

if(next!=NULL)

next->printdata(bag,decision); clearremainer() ; return;

}//数据包接收结束,输出当前类的剩余数据,并通知其他的类输出剩余数据 if(bag->ID!=ID) {

if(next!=NULL)

next->printdata(bag); else {

next=new program(bag->ID); next->printdata(databag* bag); }

}//数据包接收,比较数据包ID,如果现有的类可以接收这个ID的数据包,那么接收;否则在有池位的情况下,启动新的类

int offset=0; int remainer=0;

char* poutdata=NULL; int useddata;

if(bag->length>0) {

offset=bag->length+bag->indataoffset;

if(originalone.size()==1||originaltwo.size()==1) {

if(originalone.size()==1) {

if(bag->indataoffset>originalone[0]&&bag->indataoffset

if(originaltwo.size()==1) {

if(bag->indataoffset>originaltwo[0]&&bag->indataoffset

} }

//拦截目的文件中已不再需要的数据包 if(fence(offset)==1)

continue;//栅栏拦截成功,重复数据包丢弃 if(bag->length>512*1024)

continue;//如果数据包长度大于缓冲区限制,丢弃 while(computer(bag->length)>=1) {

if(forceout(bag->length))==0) break; }

//计算缓冲区数据量,如果当前数据包插入会溢出(512K),那么执行强制输出

receivedata(bag->indataoffset,bag->length,bag->data); remainer=remaindata(1,poutdata); if(remainer>1024) {

useddata=remainer-100;

fseek(targetfilepoint,outdataoffset,SEEK_SET); fwrite(poutdata,useddata,1,targetfilepoint); outdataoffset=outdataoffset+useddata; endnumber=endnumber+useddata; cleardata(useddata); } }

}//当文件没有读取完毕时,同时进行文件输出函数

void program::clearremainer() {

int remainer; char* poutdata; b* help;

help=buffer->next; while(help!=NULL) {

if(help->arra.size()!=0) {

for(int i=0;iarra.size();i++)

buffer->arra.push_back(help->arra[i]); help->arra.clear(); }

help=help->next;

} //将缓冲区中的数据聚集到一个存储体中 remainer=remaindata(0,poutdata);

fseek(targetfilepoint,outdataoffset,SEEK_SET); fwrite(poutdata,remainer,1,targetfilepoint); outdataoffset=outdataoffset+remainer; cleardata(remainer);

}//输出缓冲区剩余数据函数;处理尾数据不需要endnumber的更新

int program::computer(int baglength){ int totaldata=0; int temp; b* help; help=buffer;

while(help!=NULL) {

totaldata=totaldata+help->arra.size(); help=help->next; }

temp=512*1024-totaldata; if(temp>=baglength) return 0;

return (baglength-temp); }//judge缓冲区大小函数

int program::forceout(int baglength) {

int remainer; int judge;

char* poutdata=NULL; b* help=NULL; judge=0; help=buffer;

while(help!=NULL) {

if((endnumber==help->offset)&&(help->arra.size()!=0)) {

help->judge=1;

poutdata=help->arra.data(); remainer=help->arra.size();

endnumber=endnumber+remainer; if(originaltwo.size()<1)

originaltwo.push_back(help->offset);//获取疑似第一个输出到目的文件中的数据包的首元素偏移量

judge=1;break; }

help=help->next; }//输出连续数据 if(judge==1) {

fseek(targetfilepoint,outdataoffset,SEEK_SET); fwrite(poutdata,remainer,1,targetfilepoint); outdataoffset=outdataoffset+remainer; cleardata(remainer); return 1;

}//如果连续数据存储体标记成功,数据输出 help=buffer;

while(help!=NULL) {

if(help->arra.size()!=0) {

help->judge=1;

poutdata=help->arra.data(); remainer=help->arra.size();

endnumber=help->offset+remainer; if(originaltwo.size()<1)

originaltwo.push_back(help->offset);//获取疑似第一个输出到目的文件中的数据包的首元素偏移量 judge=1;break; }

help=help->next; }

if(judge==1) {

fseek(targetfilepoint,outdataoffset,SEEK_SET); fwrite(poutdata,remainer,1,targetfilepoint); outdataoffset=outdataoffset+remainer; cleardata(remainer); return 1;

}//如果不连续数据存储体标记成功,数据输出 return 0; }//强制输出函数

void program::receivedata(int indataoffset,int length,char* databag){ b* helpone;b* helptwo;int totalone,totaltwo; helptwo=buffer;

helpone=buffer;//初始化两个辅助指针 while(helpone!=NULL)

{

totalone=helpone->arra.size()+helpone->offset;//计算当前存储体下一个数据的偏移量 totaltwo=length+indataoffset;//计算当前数据包下一个数据的偏移量 if(indataoffset==totalone) {

b* follow;follow=helpone->next; int total=0;

for(int i=0;i

helpone->arra.push_back(databag[i]); total=helpone->arra.size()+helpone->offset; if(follow!=NULL) {

while(total==follow->offset) {

for(unsigned int i=0;iarra.size();i++)

helpone->arra.push_back(follow->arra[i]); helpone->next=follow->next; delete follow;

follow=helpone->next;

total=helpone->arra.size()+helpone->offset; if(follow==NULL) break; }

}//将缓冲区中可以连接的存储体,进行连接 return;

}//写在当前存储体末尾

if(totaltwo==helpone->offset) {

vector temp;

for(unsigned int i=0;iarra.size();i++) temp.push_back(helpone->arra[i]); helpone->arra.clear(); for(int i=0;i

helpone->arra.push_back(databag[i]); for(unsigned int i=0;iarra.push_back(temp[i]); temp.clear();

helpone->offset=indataoffset; return;

}//写在当前存储体开始

if(totaltwooffset) {

b* t; t=new b;

for(int i=0;i

t->arra.push_back(databag[i]); t->offset=indataoffset; t->judge=0;

if(helpone!=buffer) {

helptwo->next=t; t->next=helpone; } else {

t->next=helpone; buffer=t; }

return;

}//如果到来的数据是在第一个存储体前,或者到来的数据在两个存储体之间。那么申请一个新的存储体,并调整好指针

if(indataoffset>totalone&&helpone->next==NULL) {

b* t; t=new b;

for(iniaot i=0;i

t->arra.push_back(databag[i]); t->next=NULL;

t->offset=indataoffset; t->judge=0;

helpone->next=t; return;

}//如果当前数据包不能写入当前缓冲区的连续数据区,则申请新的存储体,将数据包写入新的存储体

helptwo=helpone;//记住当前指针的上一个指针 helpone=helpone->next;//更新当前指针 }

} //数据接收函数

int program::remaindata(int decision,char* &poutdata){ poutdata=NULL; b* help; help=buffer;

while(help!=NULL){

if((endnumber==help->offset)&&(help->arra.size()!=0)) {

if((decision==1&&help->arra.size()>1024)||(decision==0)) {

help->judge=1;

if(originalone.size()<1)

originalone.push_back(help->offset);//获取疑似第一个输出到目的文件中的数据包的首元素偏移量

}//在两种情况下该存储体被确定为输出源,一为文件没有读取完毕,存储体的数据量大于1K,二为文件读取完毕 poutdata=help->arra.data(); return (help->arra.size()); }

help=help->next;

}//该存储体中的数据与目标文件,为连续数据

help=buffer;

while(help!=NULL) {

if(help->arra.size()!=0&&decision==0)

{//选择不连续的时间,为读取完文件后 help->judge=1;

poutdata=help->arra.data(); return (help->arra.size()); }

help=help->next;

}//如果找不到连续的数据,那么寻找偏移量最小的存储体,选中这个存储体 return 0;//如果缓冲区中没有数据,那么返回0值 }//计算缓冲区数据剩余量函数

void program::cleardata(int useddata){ b* help;

help=buffer;vector temp; while(help!=NULL) {

if(help->judge==1)

{//寻找作为源的存储体

help->judge=0;//消除该源的标记

for(unsigned int i=useddata;iarra.size();i++) temp.push_back(help->arra[i]); help->arra.clear();

for(unsigned int i=0;i

help->arra.push_back(temp[i]);//更新存储体中的数据 temp.clear();

help->offset=help->offset+useddata;//更新存储体的偏移量 return; }

help=help->next; }

}//数据清除函数

int program::fence(int offset) {

for(unsigned int i=0;i

barrier.push_back(offset); return 0;

}//栅栏函数

program::~program() {

fclose(targetfilepoint); b* help=buffer; while(help!=NULL) {

delete help; help=help->next; }

if(next!=NULL) delete next;

}//析构函数

int main() {

for(int i=0;i

bufferpond[i]=NULL;//初始化存储池,只有programnumber个数池位可以使用 int bagnumber=5000;//数据包数 program* original=NULL; for(int i=0;i

if(i==0)

original=new program(bag->ID);//存储池至少有一个池位 original->printdata(bag,0); }//接收并处理数据包 original->printdata(bag,1);

delete original; //接收完毕,缓冲区输出剩余数据;delete类指针,不能释放内部动态申请的空间,也不能关闭指针,可以清除vector动态类

return 0; }

签字:

本文来源:https://www.bwwdw.com/article/et2r.html

Top