User talk:X-dsl

From Free Pascal wiki
Jump to navigationJump to search

Lazarus下Epoll代码的编写

   Following content writed by Simplified han,if you have any question,you can send email to 

x-dsl@139.com

   了解Epoll是08年的事了,当时知道它是Linux2.6下最强的网络模型,至于有多强,有人说是能支持数10万的并发连接,有人说是

数万。但普遍得出的结论就是:EPOLL模型不会随并发连接数的增加而导致性能的线型下降。由于自己一直在Redhat9下用 Kylix3做开发,内核条件不符合,所以没办法测试。

   在权哥仔的诱惑下,终于在10年初安装了uBuntu系统,作为平时的工作桌面。效果总体来说不错,起码不用装什么杀毒软件。下面先介绍一下开发环境
   操作系统:uBuntu9.10 
   开发工具:Lazarus IDE v0.9.28.2-0 beta 
   开发内容:Epoll模型的ECHO服务
   测试环境:Intel(R) Core(TM)2 Duo CPU     E8400
           4G ddr2
           Rhel5.4
   Epoll的介绍和应用,在网上是比较普遍的了,其推出已有相当长一段时间,目前没发现比较致命的Bug,是一套比较稳定可靠的网络

模型。长期以来,开发EPoll的程序员一般都是使用C,后来java的jre更新了以后,Java程序员也可以使用到Epoll模型了,这对JAVA 程序员来说是一大福音。但对Kylix或者Lazarus程序员来说,就比较致命,因为Kylix无法在2.6的操作系统下运行,所以决定它不能用 于Epoll的开发.而Lazarus是一套开源的软件,现在还不知道有没有赞助商,所以文档、更新方面都比较慢。大部分文档都是民间程序员 写的,官方没有比较权威的文档,只有Bug列表。可能Lazarus程序员本身就开发应用层或者数据库方面比较多,所以国内外一直没有关于 Epoll的FreePascal代码出现。怀着对Epoll的好奇和敬仰,大蛇明参考了网上的C代码,并在lazarus上得以实现。测试效果让人非常 满意

   在4000个并发连接的状态下,echo的CPU占用率不到0.3%,内存占用不到0.01%。道理上,并发连接数只受限于硬件配置,据不确切

消息,大概1G内存可以支持10万个并发连接。一下为echo服务的所有代码,希望对所有Lazarus开发人员有帮助

//_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_ //echo server application //use epoll //by neo //2010-03-25 //_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_ program epoll_server;

{$mode objfpc}{$H+}

uses

 {$IFDEF UNIX}{$IFDEF UseCThreads}
 cthreads,
 {$ENDIF}{$ENDIF}
 Classes, SysUtils, CustApp, Linux, LibC
 { you can add units after this };

type

 { TEPoll_Server }
 TEPoll_Server = class(TCustomApplication)
 protected
   procedure DoRun; override;
   function setnonblocking(sockfd :Integer) :Integer;
   function handle_message(new_fd :Integer) :Integer;
 public
   constructor Create(TheOwner: TComponent); override;
   destructor Destroy; override;
 end;

{ TEPoll_Server } const

 MAXBUF       = 1024;
 MAXEPOLLSIZE = 10000;

function TEPoll_Server.setnonblocking(sockfd :Integer) :Integer; var

 iResult :Integer;

begin

 iResult := fcntl(sockfd, F_SETFL, O_NONBLOCK);
 Result  := iResult;

end;


//handle_message - 处理每个 socket 上的消息收发 function TEPoll_Server.handle_message(new_fd :Integer) :Integer; var

 buf :Array[0..MAXBUF-1] of Char;
 len :Integer;
 sTmp:String;

begin

   //* 开始处理每个新连接上的数据收发 */
   memset(@buf, 0, MAXBUF);
   len := LibC.Recv(new_fd,buf[0],MAXBUF,0);
   //* 接收客户端的消息 */
   //len = recv(new_fd, buf, MAXBUF, 0);
   if (len > 0) then
   begin
     SetLength(sTMp,len);
     Move(buf[0],sTmp[1],len);
     Writeln('消息 : 句柄,' + IntToStr(new_fd) + '>>>' + Trim(sTmp));
     if libc.send(new_fd,sTmp[1],len,0) <> len then
       Writeln('错误 : 消息发送到[' + IntToStr(new_fd) + ']失败,' + IntToStr(ErrNO) + ',' + StrError(ErrNO));
   end
   else if (len < 0) then
   begin
     ShowException(Exception.Create('错误 : 消息接收失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO)));
     __close(new_fd);
     Result := -1
   end
   else if (len = 0) then
   begin
     Writeln('消息 : 句柄,' + IntToStr(new_fd) + '已断开与服务器的链接.');
     __close(new_fd);
     Result := -1
   end;
   //* 处理每个新连接上的数据收发结束 */
   Result := len;

end;

procedure TEPoll_Server.DoRun; var

 ErrorMsg: String;
 my_addr, their_addr :sockaddr_in;
 listener, new_fd, kdpfd, nfds, n, ret, curfds :Integer;
 len     :Integer;
 myport  :Integer;
 ev      :epoll_event;
 events  :array [0..MAXEPOLLSIZE-1] of epoll_event;
 rt      :rlimit;
 sTmp    :String;

begin

 // quick check parameters
 ErrorMsg:=CheckOptions('p','port');
 if ErrorMsg<> then begin
   ShowException(Exception.Create(ErrorMsg));
   Terminate;
   Exit;
 end;
 //获取参数  -p 端口号
 sTmp   := Trim(GetOptionValue('p'));
 myport := -1;
 myport := StrToIntDef(sTmp, -1);
 if (myPort < $0001) or (myPort > $FFFF) then
 begin
   ShowException(Exception.Create('错误 : 无效的端口号:' + sTmp));
   Terminate;
   Exit;
 end;


 //设置每个进程允许打开的最大文件数
 rt.rlim_max := MAXEPOLLSIZE;
 rt.rlim_cur := MAXEPOLLSIZE;
 if (setrlimit(RLIMIT_NOFILE, @rt) = -1) then
 begin
   ShowException(Exception.Create('错误 : 设置系统资源参数失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO)));
   Terminate;
   Exit;
 end
 else
   Writeln('信息 : 设置系统资源参数成功!');
 //创建文件句柄
 listener := socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
 if listener =  SOCKET_ERROR then
 begin
   ShowException(Exception.Create('错误 : 创建Socket句柄失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO)));
   Terminate;
   Exit;
 end;
 //设置非堵塞模式
 if setnonblocking(listener) =  SOCKET_ERROR then
 begin
   ShowException(Exception.Create('错误 : 设置非堵塞模式失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO)));
   Terminate;
   Exit;
 end;
 //绑定地址与端口
 memset(@my_addr, 0, SizeOf(my_addr));
 my_addr.sa_family := AF_INET;
 my_addr.sin_port  := htons(myPort);
 if LibC.bind(listener, my_addr, SizeOf(my_addr)) = SOCKET_ERROR then
 begin
   ShowException(Exception.Create('错误 : 绑定地址与端口失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO)));
   Terminate;
   Exit;
 end;
 // 监听失败
 if LibC.listen(listener, 128) = SOCKET_ERROR then
 begin
   ShowException(Exception.Create('错误 : 监听失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO)));
   Terminate;
   Exit;
 end;


 //创建EPOLL
 kdpfd      := epoll_create(MAXEPOLLSIZE);
 len        := SizeOf(sockaddr_in);
 ev.events  := EPOLLIN or EPOLLET;
 ev.data.fd := listener;
 if epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, @ev) = SOCKET_ERROR then
 begin
   ShowException(Exception.Create('错误 : 创建EPOLL失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO)));
   Terminate;
   Exit;
 end;
 curfds     := 1;
 writeln('信息 : 服务已启动,端口 : ' + IntToStr(myPort));
 while true do
 begin
   //等待有事件发生
   nfds := epoll_wait(kdpfd, events, curfds, -1);
   if (nfds = -1) then
   begin
     ShowException(Exception.Create('错误 : epoll_wait失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO)));
     //break;
   end;
   //处理所有事件 */
   for n := 0 to nfds -1 do
   begin
     if (events[n].data.fd = listener) then
     begin
       new_fd := accept(listener, @their_addr, @len);
       if (new_fd < 0) then
       begin
         ShowException(Exception.Create('错误 : 接入传入连接时发生:' + IntToStr(ErrNO) + ',' + StrError(ErrNO)));
         continue;
       end
       else
       begin
         Writeln('信息 : [' + inet_ntoa(their_addr.sin_addr) + ':' + IntToStr(ntohs(their_addr.sin_port)) + ']连接到服务器,句柄:' + IntToStr(new_fd));
         setnonblocking(new_fd);
         ev.events  := EPOLLIN or EPOLLET;
         ev.data.fd := new_fd;
         if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_fd, @ev) = SOCKET_ERROR) then
         begin
           Writeln('错误 : 把Socket' + IntToStr(new_fd) + '加入 epoll 失败!,' + IntToStr(ErrNO) + StrError(ErrNO));
           __close(new_fd);
         end;


       end;
     end
     else
     begin
       ret := handle_message(events[n].data.fd);
       //座席断开
       if (ret = SOCKET_ERROR) and (ErrNO <> EAGAIN) then  // (errno <> 11)
       begin
         epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd,@ev);
         //curfds := curfds - 1;
       end;
     end;
   end;


 end;
 __close(listener);
 Terminate;

end;

constructor TEPoll_Server.Create(TheOwner: TComponent); begin

 inherited Create(TheOwner);
 StopOnException:=True;

end;

destructor TEPoll_Server.Destroy; begin

 inherited Destroy;

end;


var

 Application: TEPoll_Server;

{$IFDEF WINDOWS}{$R epoll_server.rc}{$ENDIF}

begin

 Application:=TEPoll_Server.Create(nil);
 Application.Title:='TEPoll_Server';
 Application.Run;
 Application.Free;

end.