User talk:X-dsl
Lazarus下Epoll代码的编写
Following content writed by Simplified han,if you have any question,you can send email to
x-dsl@139.com
了解Epoll是08年的事了,当时知道它是Linux2.6下最强的网络模型,至于有多强,有人说是能支持数10万的并发连接,有人说是
数万。但普遍得出的结论就是:EPOLL模型不会随并发连接数的增加而导致性能的线型下降。由于自己一直在Redhat9下用 Kylix3做开发,内核条件不符合,所以没办法测试。
在权哥仔的诱惑下,终于在10年初安装了uBuntu系统,作为平时的工作桌面。效果总体来说不错,起码不用装什么杀毒软件。下面先介绍一下开发环境 操作系统:uBuntu9.10 开发工具:Lazarus IDE v0.9.28.2-0 beta 开发内容:Epoll模型的ECHO服务 测试环境:Intel(R) Core(TM)2 Duo CPU E8400 4G ddr2 Rhel5.4 Epoll的介绍和应用,在网上是比较普遍的了,其推出已有相当长一段时间,目前没发现比较致命的Bug,是一套比较稳定可靠的网络
模型。长期以来,开发EPoll的程序员一般都是使用C,后来java的jre更新了以后,Java程序员也可以使用到Epoll模型了,这对JAVA 程序员来说是一大福音。但对Kylix或者Lazarus程序员来说,就比较致命,因为Kylix无法在2.6的操作系统下运行,所以决定它不能用 于Epoll的开发.而Lazarus是一套开源的软件,现在还不知道有没有赞助商,所以文档、更新方面都比较慢。大部分文档都是民间程序员 写的,官方没有比较权威的文档,只有Bug列表。可能Lazarus程序员本身就开发应用层或者数据库方面比较多,所以国内外一直没有关于 Epoll的FreePascal代码出现。怀着对Epoll的好奇和敬仰,大蛇明参考了网上的C代码,并在lazarus上得以实现。测试效果让人非常 满意
在4000个并发连接的状态下,echo的CPU占用率不到0.3%,内存占用不到0.01%。道理上,并发连接数只受限于硬件配置,据不确切
消息,大概1G内存可以支持10万个并发连接。一下为echo服务的所有代码,希望对所有Lazarus开发人员有帮助
//_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_ //echo server application //use epoll //by neo //2010-03-25 //_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_ program epoll_server;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}{$IFDEF UseCThreads} cthreads, {$ENDIF}{$ENDIF} Classes, SysUtils, CustApp, Linux, LibC { you can add units after this };
type
{ TEPoll_Server }
TEPoll_Server = class(TCustomApplication) protected procedure DoRun; override; function setnonblocking(sockfd :Integer) :Integer; function handle_message(new_fd :Integer) :Integer; public constructor Create(TheOwner: TComponent); override; destructor Destroy; override; end;
{ TEPoll_Server } const
MAXBUF = 1024; MAXEPOLLSIZE = 10000;
function TEPoll_Server.setnonblocking(sockfd :Integer) :Integer; var
iResult :Integer;
begin
iResult := fcntl(sockfd, F_SETFL, O_NONBLOCK); Result := iResult;
end;
//handle_message - 处理每个 socket 上的消息收发
function TEPoll_Server.handle_message(new_fd :Integer) :Integer;
var
buf :Array[0..MAXBUF-1] of Char; len :Integer; sTmp:String;
begin
//* 开始处理每个新连接上的数据收发 */ memset(@buf, 0, MAXBUF);
len := LibC.Recv(new_fd,buf[0],MAXBUF,0); //* 接收客户端的消息 */ //len = recv(new_fd, buf, MAXBUF, 0);
if (len > 0) then begin SetLength(sTMp,len); Move(buf[0],sTmp[1],len); Writeln('消息 : 句柄,' + IntToStr(new_fd) + '>>>' + Trim(sTmp)); if libc.send(new_fd,sTmp[1],len,0) <> len then Writeln('错误 : 消息发送到[' + IntToStr(new_fd) + ']失败,' + IntToStr(ErrNO) + ',' + StrError(ErrNO)); end else if (len < 0) then begin ShowException(Exception.Create('错误 : 消息接收失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO))); __close(new_fd); Result := -1 end else if (len = 0) then begin Writeln('消息 : 句柄,' + IntToStr(new_fd) + '已断开与服务器的链接.'); __close(new_fd); Result := -1 end; //* 处理每个新连接上的数据收发结束 */ Result := len;
end;
procedure TEPoll_Server.DoRun; var
ErrorMsg: String; my_addr, their_addr :sockaddr_in; listener, new_fd, kdpfd, nfds, n, ret, curfds :Integer; len :Integer; myport :Integer; ev :epoll_event; events :array [0..MAXEPOLLSIZE-1] of epoll_event; rt :rlimit; sTmp :String;
begin
// quick check parameters ErrorMsg:=CheckOptions('p','port'); if ErrorMsg<> then begin ShowException(Exception.Create(ErrorMsg)); Terminate; Exit; end;
//获取参数 -p 端口号 sTmp := Trim(GetOptionValue('p')); myport := -1; myport := StrToIntDef(sTmp, -1); if (myPort < $0001) or (myPort > $FFFF) then begin ShowException(Exception.Create('错误 : 无效的端口号:' + sTmp)); Terminate; Exit; end;
//设置每个进程允许打开的最大文件数 rt.rlim_max := MAXEPOLLSIZE; rt.rlim_cur := MAXEPOLLSIZE; if (setrlimit(RLIMIT_NOFILE, @rt) = -1) then begin ShowException(Exception.Create('错误 : 设置系统资源参数失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO))); Terminate; Exit; end else Writeln('信息 : 设置系统资源参数成功!');
//创建文件句柄 listener := socket(AF_INET, SOCK_STREAM, IPPROTO_IP); if listener = SOCKET_ERROR then begin ShowException(Exception.Create('错误 : 创建Socket句柄失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO))); Terminate; Exit; end;
//设置非堵塞模式 if setnonblocking(listener) = SOCKET_ERROR then begin ShowException(Exception.Create('错误 : 设置非堵塞模式失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO))); Terminate; Exit; end;
//绑定地址与端口 memset(@my_addr, 0, SizeOf(my_addr)); my_addr.sa_family := AF_INET; my_addr.sin_port := htons(myPort); if LibC.bind(listener, my_addr, SizeOf(my_addr)) = SOCKET_ERROR then begin ShowException(Exception.Create('错误 : 绑定地址与端口失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO))); Terminate; Exit; end;
// 监听失败 if LibC.listen(listener, 128) = SOCKET_ERROR then begin ShowException(Exception.Create('错误 : 监听失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO))); Terminate; Exit; end;
//创建EPOLL kdpfd := epoll_create(MAXEPOLLSIZE); len := SizeOf(sockaddr_in); ev.events := EPOLLIN or EPOLLET; ev.data.fd := listener; if epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, @ev) = SOCKET_ERROR then begin ShowException(Exception.Create('错误 : 创建EPOLL失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO))); Terminate; Exit; end; curfds := 1;
writeln('信息 : 服务已启动,端口 : ' + IntToStr(myPort));
while true do begin //等待有事件发生 nfds := epoll_wait(kdpfd, events, curfds, -1); if (nfds = -1) then begin ShowException(Exception.Create('错误 : epoll_wait失败:' + IntToStr(ErrNO) + ',' + StrError(ErrNO))); //break; end;
//处理所有事件 */ for n := 0 to nfds -1 do begin if (events[n].data.fd = listener) then begin new_fd := accept(listener, @their_addr, @len); if (new_fd < 0) then begin ShowException(Exception.Create('错误 : 接入传入连接时发生:' + IntToStr(ErrNO) + ',' + StrError(ErrNO))); continue; end else begin Writeln('信息 : [' + inet_ntoa(their_addr.sin_addr) + ':' + IntToStr(ntohs(their_addr.sin_port)) + ']连接到服务器,句柄:' + IntToStr(new_fd)); setnonblocking(new_fd); ev.events := EPOLLIN or EPOLLET; ev.data.fd := new_fd; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_fd, @ev) = SOCKET_ERROR) then begin Writeln('错误 : 把Socket' + IntToStr(new_fd) + '加入 epoll 失败!,' + IntToStr(ErrNO) + StrError(ErrNO)); __close(new_fd); end;
end;
end else begin
ret := handle_message(events[n].data.fd); //座席断开 if (ret = SOCKET_ERROR) and (ErrNO <> EAGAIN) then // (errno <> 11) begin epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd,@ev); //curfds := curfds - 1;
end; end; end;
end; __close(listener); Terminate;
end;
constructor TEPoll_Server.Create(TheOwner: TComponent); begin
inherited Create(TheOwner); StopOnException:=True;
end;
destructor TEPoll_Server.Destroy; begin
inherited Destroy;
end;
var
Application: TEPoll_Server;
{$IFDEF WINDOWS}{$R epoll_server.rc}{$ENDIF}
begin
Application:=TEPoll_Server.Create(nil); Application.Title:='TEPoll_Server'; Application.Run; Application.Free;
end.