免费做网站哪个好,公众号seo排名软件,做一个app需要什么流程,网站建设教程百度云From: http://blog.csdn.net/woshinia/article/details/8585930 部分代码参考《[WINDOWS网络与通信程序设计].王艳平》#xff0c;网络中一些I/O模型的代码都没有对socket是否可写做过深入研究#xff0c;我这边会提供一些解决方法。
阻塞模式下#xff0c;send会发生阻塞…From: http://blog.csdn.net/woshinia/article/details/8585930 部分代码参考《[WINDOWS网络与通信程序设计].王艳平》网络中一些I/O模型的代码都没有对socket是否可写做过深入研究我这边会提供一些解决方法。
阻塞模式下send会发生阻塞非阻塞模式下send返回WSAEWOULDBLOCK错误重叠I/O下表现为投递的发送请求一直无法完成的情况一般可以分为3种 :
1, 服务器虽然发送了大量数据但客户端并未调用recv函数去接。
2网络状况不佳发送缓冲区中的数据一直发不出去。
3发送数据量很大如下载功能协议发送数据的速度比不上send函数将数据拷贝到发送缓冲区的速度。
对于1,2情况我们似乎可以直接关闭套接字让客户端重新请求。但对于3却不行。而且实际操作过程中我们无法区分是1,2,还是3我们能做的是尽量去保证发送的正确性。当然防止1情况或者2情况中长时间网络不畅可以设定超时。若socket一直处于不可写状态超过1分钟那么就关闭套接字。在最后的IOCP模型中就加入了这种超时机制。其他模型若要加入可参考它来做。 一基本的阻塞模型 [cpp] view plaincopyprint? #include WinSock2.h #include Windows.h #include stdio.h #pragma comment(lib,Ws2_32.lib) DWORD WINAPI WorkThread(void* param) { SOCKET* psClient (SOCKET*)param; char buf[4096]; while(true) { int len recv(*psClient,buf,4096,0); if(len 0) { printf(recv失败%d\n,WSAGetLastError()); Sleep(5000); break; } buf[len] \0; printf(收到数据%s\n,buf); } closesocket(*psClient); delete psClient; return 0; } int main() { WSAData wsaData; if(0 ! WSAStartup(MAKEWORD(2,2),wsaData)) { printf(WSAStartup失败\n,WSAGetLastError()); Sleep(5000); return 0; } USHORT nPort 3456; SOCKET sListen socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); sockaddr_in sin; sin.sin_family AF_INET; sin.sin_port htons(nPort); sin.sin_addr.S_un.S_addr INADDR_ANY; if(SOCKET_ERROR ::bind(sListen,(sockaddr*)sin,sizeof(sin))) { printf(bind失败%d\n,WSAGetLastError()); Sleep(5000); return -1; } ::listen(sListen,5); while(true) { sockaddr_in addrRemote; int nAddrLen sizeof(addrRemote); SOCKET *psClient new SOCKET; *psClient accept(sListen,(sockaddr*)addrRemote,nAddrLen); HANDLE hThread CreateThread(NULL,0,WorkThread,psClient,0,NULL); CloseHandle(hThread); } closesocket(sListen); WSACleanup(); } #include WinSock2.h
#include Windows.h
#include stdio.h
#pragma comment(lib,Ws2_32.lib)
DWORD WINAPI WorkThread(void* param)
{
SOCKET* psClient (SOCKET*)param;
char buf[4096];
while(true)
{
int len recv(*psClient,buf,4096,0);
if(len 0)
{
printf(recv失败%d\n,WSAGetLastError());
Sleep(5000);
break;
}
buf[len] \0;
printf(收到数据%s\n,buf);
}
closesocket(*psClient);
delete psClient;
return 0;
}
int main()
{
WSAData wsaData;
if(0 ! WSAStartup(MAKEWORD(2,2),wsaData))
{
printf(WSAStartup失败\n,WSAGetLastError());
Sleep(5000);
return 0;
}
USHORT nPort 3456;
SOCKET sListen socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family AF_INET;
sin.sin_port htons(nPort);
sin.sin_addr.S_un.S_addr INADDR_ANY;
if(SOCKET_ERROR ::bind(sListen,(sockaddr*)sin,sizeof(sin)))
{
printf(bind失败%d\n,WSAGetLastError());
Sleep(5000);
return -1;
}
::listen(sListen,5);
while(true)
{
sockaddr_in addrRemote;
int nAddrLen sizeof(addrRemote);
SOCKET *psClient new SOCKET;
*psClient accept(sListen,(sockaddr*)addrRemote,nAddrLen);
HANDLE hThread CreateThread(NULL,0,WorkThread,psClient,0,NULL);
CloseHandle(hThread);
}
closesocket(sListen);
WSACleanup();
}二无任何优化的非阻塞模型 [cpp] view plaincopyprint? #include WinSock2.h #include Windows.h #include stdio.h #include vector using namespace std; #pragma comment(lib,Ws2_32.lib) CRITICAL_SECTION g_cs; HANDLE g_StartEvent; vectorSOCKET g_vecClients; int g_iVecSize 0; DWORD WINAPI WorkThread(void* param) { char buf[4096]; while(1) { if(g_vecClients.empty()) { ResetEvent(g_StartEvent); WaitForSingleObject(g_StartEvent,INFINITE); } EnterCriticalSection(g_cs); for(vectorSOCKET::iterator it g_vecClients.begin();it ! g_vecClients.end();) { int len recv(*it,buf,4096,0); if(len SOCKET_ERROR) { if(WSAEWOULDBLOCK ! WSAGetLastError()) { printf(recv Error:%d\n,WSAGetLastError()); closesocket(*it); it g_vecClients.erase(it); } else { printf(%d.,*it); it; } } else { buf[len] 0; printf(收到数据: %s\n,buf); it; } } LeaveCriticalSection(g_cs); Sleep(100); } return 0; } int main() { InitializeCriticalSectionAndSpinCount(g_cs,4000); g_StartEvent CreateEvent(NULL,FALSE,FALSE,NULL); WSAData wsaDate; WSAStartup(MAKEWORD(2,2),wsaDate); USHORT nport 3456; u_long ul 1; SOCKET s socket(AF_INET,SOCK_STREAM,0); ioctlsocket(s,FIONBIO,ul); sockaddr_in sin; sin.sin_family AF_INET; sin.sin_port htons(nport); sin.sin_addr.S_un.S_addr ADDR_ANY; if(SOCKET_ERROR ::bind(s,(sockaddr*)sin,sizeof(sin))) { return -1; } ::listen(s,5); HANDLE hThread CreateThread(NULL,0,WorkThread,NULL,0,NULL); CloseHandle(hThread); while(true) { sockaddr_in addrRemote; int nAddrLen sizeof(addrRemote); SOCKET sClient accept(s,(sockaddr*)addrRemote,nAddrLen); if(sClient ! SOCKET_ERROR) { EnterCriticalSection(g_cs); g_vecClients.push_back(sClient); LeaveCriticalSection(g_cs); if(g_vecClients.size() 1) SetEvent(g_StartEvent); } else if(WSAEWOULDBLOCK WSAGetLastError()) { printf(.); Sleep(100); } else { printf(accept failed! %d\n,WSAGetLastError()); } } closesocket(s); WSACleanup(); CloseHandle(g_StartEvent); DeleteCriticalSection(g_cs); } #include WinSock2.h
#include Windows.h
#include stdio.h
#include vector
using namespace std;
#pragma comment(lib,Ws2_32.lib)
CRITICAL_SECTION g_cs;
HANDLE g_StartEvent;
vectorSOCKET g_vecClients;
int g_iVecSize 0;
DWORD WINAPI WorkThread(void* param)
{
char buf[4096];
while(1)
{
if(g_vecClients.empty())
{
ResetEvent(g_StartEvent);
WaitForSingleObject(g_StartEvent,INFINITE);
}
EnterCriticalSection(g_cs);
for(vectorSOCKET::iterator it g_vecClients.begin();it ! g_vecClients.end();)
{
int len recv(*it,buf,4096,0);
if(len SOCKET_ERROR)
{
if(WSAEWOULDBLOCK ! WSAGetLastError())
{
printf(recv Error:%d\n,WSAGetLastError());
closesocket(*it);
it g_vecClients.erase(it);
}
else
{
printf(%d.,*it);
it;
}
}
else
{
buf[len] 0;
printf(收到数据: %s\n,buf);
it;
}
}
LeaveCriticalSection(g_cs);
Sleep(100);
}
return 0;
}
int main()
{
InitializeCriticalSectionAndSpinCount(g_cs,4000);
g_StartEvent CreateEvent(NULL,FALSE,FALSE,NULL);
WSAData wsaDate;
WSAStartup(MAKEWORD(2,2),wsaDate);
USHORT nport 3456;
u_long ul 1;
SOCKET s socket(AF_INET,SOCK_STREAM,0);
ioctlsocket(s,FIONBIO,ul);
sockaddr_in sin;
sin.sin_family AF_INET;
sin.sin_port htons(nport);
sin.sin_addr.S_un.S_addr ADDR_ANY;
if(SOCKET_ERROR ::bind(s,(sockaddr*)sin,sizeof(sin)))
{
return -1;
}
::listen(s,5);
HANDLE hThread CreateThread(NULL,0,WorkThread,NULL,0,NULL);
CloseHandle(hThread);
while(true)
{
sockaddr_in addrRemote;
int nAddrLen sizeof(addrRemote);
SOCKET sClient accept(s,(sockaddr*)addrRemote,nAddrLen);
if(sClient ! SOCKET_ERROR)
{
EnterCriticalSection(g_cs);
g_vecClients.push_back(sClient);
LeaveCriticalSection(g_cs);
if(g_vecClients.size() 1)
SetEvent(g_StartEvent);
}
else if(WSAEWOULDBLOCK WSAGetLastError())
{
printf(.);
Sleep(100);
}
else
{
printf(accept failed! %d\n,WSAGetLastError());
}
}
closesocket(s);
WSACleanup();
CloseHandle(g_StartEvent);
DeleteCriticalSection(g_cs);
}三select模型 [cpp] view plaincopyprint? #include WinSock2.h #include Windows.h #include MSWSock.h #include stdio.h #include map using namespace std; #pragma comment(lib,Ws2_32.lib) #pragma comment(lib,Mswsock.lib) struct ThreadObj{ OVERLAPPED *pOl; HANDLE s; }; int g_iIndex 0; mapSOCKET,char* g_map; int main() { WSAData wsaData; if(0 ! WSAStartup(MAKEWORD(2,2),wsaData)) { printf(初始化失败!%d\n,WSAGetLastError()); Sleep(5000); return -1; } USHORT nport 3456; SOCKET sListen socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); u_long ul 1; ioctlsocket(sListen,FIONBIO,ul); sockaddr_in sin; sin.sin_family AF_INET; sin.sin_port htons(nport); sin.sin_addr.S_un.S_addr ADDR_ANY; if(SOCKET_ERROR bind(sListen,(sockaddr*)sin,sizeof(sin))) { printf(bind failed!%d\n,WSAGetLastError()); Sleep(5000); return -1; } listen(sListen,5); //1)初始化一个套接字集合fdSocket并将监听套接字放入 fd_set fdSocket; FD_ZERO(fdSocket); FD_SET(sListen,fdSocket); TIMEVAL time{1,0}; char buf[4096]; fd_set fdWrite; FD_ZERO(fdWrite); while(true) { //2将fdSocket的一个拷贝fdRead传给select函数 fd_set fdRead fdSocket; fd_set fdTmp fdWrite; int nRetAll 0; if(fdTmp.fd_count 0) nRetAll select(0,fdRead,fdTmp,NULL,NULL/*time*/);//若不设置超时则select为阻塞 else nRetAll select(0,fdRead,NULL,NULL,NULL/*time*/); if(nRetAll 0) { //3通过将原来的fdSocket和被select处理过的fdRead进行比较决定由哪些socket有数据可以读取 for(int i0;ifdSocket.fd_count;i) { if(FD_ISSET(fdSocket.fd_array[i],fdRead)) { if(fdSocket.fd_array[i] sListen) { if(fdSocket.fd_count FD_SETSIZE) { sockaddr_in addrRemote; int nAddrLen sizeof(addrRemote); SOCKET sClient accept(sListen,(sockaddr*)addrRemote,nAddrLen); FD_SET(sClient,fdSocket); printf(接收到连接(%s)\n,inet_ntoa(addrRemote.sin_addr)); } else { printf(连接数量已达上限\n); continue; } } else { int nRecv recv(fdSocket.fd_array[i],buf,4096,0); if(nRecv 0) { buf[nRecv] 0; printf(收到数据%s\n,buf); int nRet send(fdSocket.fd_array[i],buf,nRecv,0); if(nRet 0) { SOCKET s fdSocket.fd_array[i]; if(GetLastError() WSAEWOULDBLOCK) { if(g_map.find(s) g_map.end()) { char* szTmp new char[nRecv 1]; strncpy(szTmp,buf,nRecv); szTmp[nRecv] 0; g_map[s] szTmp; } else { char* szOld g_map[s]; char* szTmp2 new char[strlen(szOld) nRecv 1]; strncpy(szTmp2,szOld,strlen(szOld)); strncpy(szTmp2 strlen(szOld),buf,nRecv); szTmp2[strlen(szOld) nRecv] 0; delete [] szOld; g_map[s] szTmp2; } FD_SET(fdSocket.fd_array[i],fdWrite); } else { closesocket(fdSocket.fd_array[i]); if(g_map.find(s) ! g_map.end()) { if(g_map[s] ! NULL) delete [] g_map[s]; g_map.erase(s); } FD_CLR(fdSocket.fd_array[i],fdSocket); } } printf(发送了%d\n,nRet); } else { printf(1个Client已断开\n); closesocket(fdSocket.fd_array[i]); FD_CLR(fdSocket.fd_array[i],fdSocket); } } } if(FD_ISSET(fdSocket.fd_array[i],fdTmp)) { SOCKET s fdSocket.fd_array[i]; if(g_map.find(s) ! g_map.end()) { char* szToSend g_map[s]; int nToSend strlen(szToSend); int nRet send(fdSocket.fd_array[i],szToSend,nToSend,0); if(nRet 0) { if(GetLastError() WSAEWOULDBLOCK) { //do nothing } else { closesocket(fdSocket.fd_array[i]); if(g_map.find(s) ! g_map.end()) { if(g_map[s] ! NULL) delete [] g_map[s]; g_map.erase(s); } FD_CLR(fdSocket.fd_array[i],fdSocket); } } else if(nRet nToSend) { printf(发送了%d/%d\n,nRet,nToSend); nToSend - nRet; char* szTmp new char[nToSend 1]; strncpy(szTmp,szToSend nRet,nToSend); szTmp[nToSend] 0; delete [] szToSend; g_map[s] szTmp; } else { if(g_map[s] ! NULL) delete [] g_map[s]; g_map.erase(s); FD_CLR(fdSocket.fd_array[i],fdWrite); } printf(发送了%d\n,nRet); } } } } else if(nRetAll 0) { printf(time out!\n); } else { printf(select error!%d\n,WSAGetLastError()); Sleep(5000); break; } } closesocket(sListen); WSACleanup(); } #include WinSock2.h
#include Windows.h
#include MSWSock.h
#include stdio.h
#include map
using namespace std;
#pragma comment(lib,Ws2_32.lib)
#pragma comment(lib,Mswsock.lib)
struct ThreadObj{
OVERLAPPED *pOl;
HANDLE s;
};
int g_iIndex 0;
mapSOCKET,char* g_map;
int main()
{
WSAData wsaData;
if(0 ! WSAStartup(MAKEWORD(2,2),wsaData))
{
printf(初始化失败!%d\n,WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport 3456;
SOCKET sListen socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
u_long ul 1;
ioctlsocket(sListen,FIONBIO,ul);
sockaddr_in sin;
sin.sin_family AF_INET;
sin.sin_port htons(nport);
sin.sin_addr.S_un.S_addr ADDR_ANY;
if(SOCKET_ERROR bind(sListen,(sockaddr*)sin,sizeof(sin)))
{
printf(bind failed!%d\n,WSAGetLastError());
Sleep(5000);
return -1;
}
listen(sListen,5);
//1)初始化一个套接字集合fdSocket并将监听套接字放入
fd_set fdSocket;
FD_ZERO(fdSocket);
FD_SET(sListen,fdSocket);
TIMEVAL time{1,0};
char buf[4096];
fd_set fdWrite;
FD_ZERO(fdWrite);
while(true)
{
//2将fdSocket的一个拷贝fdRead传给select函数
fd_set fdRead fdSocket;
fd_set fdTmp fdWrite;
int nRetAll 0;
if(fdTmp.fd_count 0)
nRetAll select(0,fdRead,fdTmp,NULL,NULL/*time*/);//若不设置超时则select为阻塞
else
nRetAll select(0,fdRead,NULL,NULL,NULL/*time*/);
if(nRetAll 0)
{
//3通过将原来的fdSocket和被select处理过的fdRead进行比较决定由哪些socket有数据可以读取
for(int i0;ifdSocket.fd_count;i)
{
if(FD_ISSET(fdSocket.fd_array[i],fdRead))
{
if(fdSocket.fd_array[i] sListen)
{
if(fdSocket.fd_count FD_SETSIZE)
{
sockaddr_in addrRemote;
int nAddrLen sizeof(addrRemote);
SOCKET sClient accept(sListen,(sockaddr*)addrRemote,nAddrLen);
FD_SET(sClient,fdSocket);
printf(接收到连接(%s)\n,inet_ntoa(addrRemote.sin_addr));
}
else
{
printf(连接数量已达上限\n);
continue;
}
}
else
{
int nRecv recv(fdSocket.fd_array[i],buf,4096,0);
if(nRecv 0)
{
buf[nRecv] 0;
printf(收到数据%s\n,buf);
int nRet send(fdSocket.fd_array[i],buf,nRecv,0);
if(nRet 0)
{
SOCKET s fdSocket.fd_array[i];
if(GetLastError() WSAEWOULDBLOCK)
{
if(g_map.find(s) g_map.end())
{
char* szTmp new char[nRecv 1];
strncpy(szTmp,buf,nRecv);
szTmp[nRecv] 0;
g_map[s] szTmp;
}
else
{
char* szOld g_map[s];
char* szTmp2 new char[strlen(szOld) nRecv 1];
strncpy(szTmp2,szOld,strlen(szOld));
strncpy(szTmp2 strlen(szOld),buf,nRecv);
szTmp2[strlen(szOld) nRecv] 0;
delete [] szOld;
g_map[s] szTmp2;
}
FD_SET(fdSocket.fd_array[i],fdWrite);
}
else
{
closesocket(fdSocket.fd_array[i]);
if(g_map.find(s) ! g_map.end())
{
if(g_map[s] ! NULL)
delete [] g_map[s];
g_map.erase(s);
}
FD_CLR(fdSocket.fd_array[i],fdSocket);
}
}
printf(发送了%d\n,nRet);
}
else
{
printf(1个Client已断开\n);
closesocket(fdSocket.fd_array[i]);
FD_CLR(fdSocket.fd_array[i],fdSocket);
}
}
}
if(FD_ISSET(fdSocket.fd_array[i],fdTmp))
{
SOCKET s fdSocket.fd_array[i];
if(g_map.find(s) ! g_map.end())
{
char* szToSend g_map[s];
int nToSend strlen(szToSend);
int nRet send(fdSocket.fd_array[i],szToSend,nToSend,0);
if(nRet 0)
{
if(GetLastError() WSAEWOULDBLOCK)
{
//do nothing
}
else
{
closesocket(fdSocket.fd_array[i]);
if(g_map.find(s) ! g_map.end())
{
if(g_map[s] ! NULL)
delete [] g_map[s];
g_map.erase(s);
}
FD_CLR(fdSocket.fd_array[i],fdSocket);
}
}
else if(nRet nToSend)
{
printf(发送了%d/%d\n,nRet,nToSend);
nToSend - nRet;
char* szTmp new char[nToSend 1];
strncpy(szTmp,szToSend nRet,nToSend);
szTmp[nToSend] 0;
delete [] szToSend;
g_map[s] szTmp;
}
else
{
if(g_map[s] ! NULL)
delete [] g_map[s];
g_map.erase(s);
FD_CLR(fdSocket.fd_array[i],fdWrite);
}
printf(发送了%d\n,nRet);
}
}
}
}
else if(nRetAll 0)
{
printf(time out!\n);
}
else
{
printf(select error!%d\n,WSAGetLastError());
Sleep(5000);
break;
}
}
closesocket(sListen);
WSACleanup();
}四异步选择模型
注意收到FD_Write消息有2种情况1在socket第一次和窗口句柄绑定后。2socket从不可写状态变成可写状态。下面的事件选择模型也是同理。 [cpp] view plaincopyprint? #include WinSock2.h #include Windows.h #include stdio.h #include map using namespace std; #pragma comment(lib,Ws2_32.lib) #define WM_SOCKET (WM_USER 100) mapSOCKET,char* g_map; LRESULT WINAPI WindowProc(HWND hwnd,UINT uMsg,WPARAM wParam,LPARAM lParam) { switch(uMsg) { case WM_SOCKET: { SOCKET s wParam; if(WSAGETSELECTERROR(lParam)) { printf(消息错误\n); closesocket(s); return 0; } switch(WSAGETSELECTEVENT(lParam)) { case FD_ACCEPT: { sockaddr_in addrRemote; int nAddrLen sizeof(addrRemote); SOCKET sClient accept(s,(sockaddr*)addrRemote,nAddrLen); WSAAsyncSelect(sClient,hwnd,WM_SOCKET,FD_READ | FD_WRITE | FD_CLOSE); }break; case FD_WRITE: { printf(write\n); if(!g_map.empty()) { char* buf g_map[s]; int nLenth strlen(buf); while(nLenth 0) { int nRet send(s,buf,nLenth,0); if(nRet 0) { buf nRet; nLenth - nRet; } else if(10035 GetLastError()) { char* newBuf new char[nLenth 1]; strncpy(newBuf,buf,nLenth); newBuf[nLenth] 0; delete [] g_map[s]; g_map[s] newBuf; break; } else { delete [] g_map[s]; g_map.erase(s); closesocket(s); } } if(nLenth 0) { g_map.erase(s); } } }break; case FD_READ: { char buf[4096]; int nRet recv(s,buf,4096,0); if(nRet 0) { buf[nRet] 0; //printf(收到数据%s\n,buf); int x send(s,buf,nRet,0); printf(已发送字节数%d , 线程号%d\n,x,GetCurrentThreadId()); if(x 0) { int iError GetLastError(); printf(数据:%s ,错误%d\n,buf,iError); if(10035 iError) { if(g_map.end() ! g_map.find(s)) { int newLength strlen(g_map[s]) strlen(buf); char* newBuf new char[newLength 1]; strncpy(newBuf,g_map[s],strlen(g_map[s])); strncpy(newBufstrlen(g_map[s]),buf,strlen(buf)); newBuf[newLength] 0; delete [] g_map[s]; g_map[s] newBuf; } else { char* newBuf new char[strlen(buf) 1]; strncpy(newBuf,buf,strlen(buf)); newBuf[strlen(buf)] 0; g_map[s] newBuf; } } else { if(g_map.end() ! g_map.find(s)) { delete [] g_map[s]; g_map.erase(s); } closesocket(s); } } } else { printf(1个Client已经断开1111\n); if(g_map.end() ! g_map.find(s)) { delete [] g_map[s]; g_map.erase(s); } closesocket(s); } }break; case FD_CLOSE: { printf(1个Client已经断开222\n); if(g_map.end() ! g_map.find(s)) { delete [] g_map[s]; g_map.erase(s); } closesocket(s); }break; } }break; case WM_DESTROY: { printf(窗口已关闭\n); PostQuitMessage(0); } } return DefWindowProc(hwnd,uMsg,wParam,lParam); } int main() { char szClassName[] WSAAsyncSelect Test; static WNDCLASSEX wndClass; wndClass.cbSize sizeof(wndClass); wndClass.style CS_HREDRAW | CS_VREDRAW; wndClass.lpfnWndProc WindowProc; wndClass.cbClsExtra 0; wndClass.cbWndExtra 0; wndClass.hInstance GetModuleHandle(0); wndClass.hIcon LoadIcon(NULL,IDI_APPLICATION); wndClass.hCursor LoadCursor(NULL,IDC_ARROW); wndClass.hbrBackground (HBRUSH)GetStockObject(WHITE_BRUSH); wndClass.lpszMenuName NULL; wndClass.lpszClassName szClassName; wndClass.hIconSm NULL; ATOM atom RegisterClassEx(wndClass); if(0 atom) { char error[256]; sprintf(error,RegisterClassEx错误%d,GetLastError()); MessageBox(NULL,error,error,MB_OK); return -1; } HWND hwnd CreateWindowEx(0,(char *)atom,,WS_OVERLAPPEDWINDOW,CW_USEDEFAULT,CW_USEDEFAULT, CW_USEDEFAULT,CW_USEDEFAULT,HWND_MESSAGE,NULL,NULL,NULL); if(hwnd NULL) { char error[256]; sprintf(error,创建窗口错误%d,GetLastError()); MessageBox(NULL,error,error,MB_OK); return -1; } WSAData wsaData; if(0 ! WSAStartup(MAKEWORD(2,2),wsaData)) { printf(初始化失败!%d\n,WSAGetLastError()); Sleep(5000); return -1; } USHORT nport 3456; SOCKET sListen socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); sockaddr_in sin; sin.sin_family AF_INET; sin.sin_port htons(nport); sin.sin_addr.S_un.S_addr ADDR_ANY; if(SOCKET_ERROR bind(sListen,(sockaddr*)sin,sizeof(sin))) { printf(bind failed!%d\n,WSAGetLastError()); Sleep(5000); return -1; } WSAAsyncSelect(sListen,hwnd,WM_SOCKET,FD_ACCEPT | FD_CLOSE); listen(sListen,5); MSG msg; while(GetMessage(msg,NULL,0,0)) { TranslateMessage(msg); DispatchMessage(msg); } closesocket(sListen); WSACleanup(); return msg.wParam; } #include WinSock2.h
#include Windows.h
#include stdio.h
#include map
using namespace std;
#pragma comment(lib,Ws2_32.lib)
#define WM_SOCKET (WM_USER 100)
mapSOCKET,char* g_map;
LRESULT WINAPI WindowProc(HWND hwnd,UINT uMsg,WPARAM wParam,LPARAM lParam)
{
switch(uMsg)
{
case WM_SOCKET:
{
SOCKET s wParam;
if(WSAGETSELECTERROR(lParam))
{
printf(消息错误\n);
closesocket(s);
return 0;
}
switch(WSAGETSELECTEVENT(lParam))
{
case FD_ACCEPT:
{
sockaddr_in addrRemote;
int nAddrLen sizeof(addrRemote);
SOCKET sClient accept(s,(sockaddr*)addrRemote,nAddrLen);
WSAAsyncSelect(sClient,hwnd,WM_SOCKET,FD_READ | FD_WRITE | FD_CLOSE);
}break;
case FD_WRITE:
{
printf(write\n);
if(!g_map.empty())
{
char* buf g_map[s];
int nLenth strlen(buf);
while(nLenth 0)
{
int nRet send(s,buf,nLenth,0);
if(nRet 0)
{
buf nRet;
nLenth - nRet;
}
else if(10035 GetLastError())
{
char* newBuf new char[nLenth 1];
strncpy(newBuf,buf,nLenth);
newBuf[nLenth] 0;
delete [] g_map[s];
g_map[s] newBuf;
break;
}
else
{
delete [] g_map[s];
g_map.erase(s);
closesocket(s);
}
}
if(nLenth 0)
{
g_map.erase(s);
}
}
}break;
case FD_READ:
{
char buf[4096];
int nRet recv(s,buf,4096,0);
if(nRet 0)
{
buf[nRet] 0;
//printf(收到数据%s\n,buf);
int x send(s,buf,nRet,0);
printf(已发送字节数%d , 线程号%d\n,x,GetCurrentThreadId());
if(x 0)
{
int iError GetLastError();
printf(数据:%s ,错误%d\n,buf,iError);
if(10035 iError)
{
if(g_map.end() ! g_map.find(s))
{
int newLength strlen(g_map[s]) strlen(buf);
char* newBuf new char[newLength 1];
strncpy(newBuf,g_map[s],strlen(g_map[s]));
strncpy(newBufstrlen(g_map[s]),buf,strlen(buf));
newBuf[newLength] 0;
delete [] g_map[s];
g_map[s] newBuf;
}
else
{
char* newBuf new char[strlen(buf) 1];
strncpy(newBuf,buf,strlen(buf));
newBuf[strlen(buf)] 0;
g_map[s] newBuf;
}
}
else
{
if(g_map.end() ! g_map.find(s))
{
delete [] g_map[s];
g_map.erase(s);
}
closesocket(s);
}
}
}
else
{
printf(1个Client已经断开1111\n);
if(g_map.end() ! g_map.find(s))
{
delete [] g_map[s];
g_map.erase(s);
}
closesocket(s);
}
}break;
case FD_CLOSE:
{
printf(1个Client已经断开222\n);
if(g_map.end() ! g_map.find(s))
{
delete [] g_map[s];
g_map.erase(s);
}
closesocket(s);
}break;
}
}break;
case WM_DESTROY:
{
printf(窗口已关闭\n);
PostQuitMessage(0);
}
}
return DefWindowProc(hwnd,uMsg,wParam,lParam);
}
int main()
{
char szClassName[] WSAAsyncSelect Test;
static WNDCLASSEX wndClass;
wndClass.cbSize sizeof(wndClass);
wndClass.style CS_HREDRAW | CS_VREDRAW;
wndClass.lpfnWndProc WindowProc;
wndClass.cbClsExtra 0;
wndClass.cbWndExtra 0;
wndClass.hInstance GetModuleHandle(0);
wndClass.hIcon LoadIcon(NULL,IDI_APPLICATION);
wndClass.hCursor LoadCursor(NULL,IDC_ARROW);
wndClass.hbrBackground (HBRUSH)GetStockObject(WHITE_BRUSH);
wndClass.lpszMenuName NULL;
wndClass.lpszClassName szClassName;
wndClass.hIconSm NULL;
ATOM atom RegisterClassEx(wndClass);
if(0 atom)
{
char error[256];
sprintf(error,RegisterClassEx错误%d,GetLastError());
MessageBox(NULL,error,error,MB_OK);
return -1;
}
HWND hwnd CreateWindowEx(0,(char *)atom,,WS_OVERLAPPEDWINDOW,CW_USEDEFAULT,CW_USEDEFAULT,
CW_USEDEFAULT,CW_USEDEFAULT,HWND_MESSAGE,NULL,NULL,NULL);
if(hwnd NULL)
{
char error[256];
sprintf(error,创建窗口错误%d,GetLastError());
MessageBox(NULL,error,error,MB_OK);
return -1;
}
WSAData wsaData;
if(0 ! WSAStartup(MAKEWORD(2,2),wsaData))
{
printf(初始化失败!%d\n,WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport 3456;
SOCKET sListen socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family AF_INET;
sin.sin_port htons(nport);
sin.sin_addr.S_un.S_addr ADDR_ANY;
if(SOCKET_ERROR bind(sListen,(sockaddr*)sin,sizeof(sin)))
{
printf(bind failed!%d\n,WSAGetLastError());
Sleep(5000);
return -1;
}
WSAAsyncSelect(sListen,hwnd,WM_SOCKET,FD_ACCEPT | FD_CLOSE);
listen(sListen,5);
MSG msg;
while(GetMessage(msg,NULL,0,0))
{
TranslateMessage(msg);
DispatchMessage(msg);
}
closesocket(sListen);
WSACleanup();
return msg.wParam;
}五事件选择模型
事件选择模型主要难点是对线程池的使用send操作可以参考异步选择模型。 [cpp] view plaincopyprint? #include WinSock2.h #include Windows.h #include stdio.h #include vector using namespace std; #pragma comment(lib,Ws2_32.lib) typedef struct _THREAD_OBJ { HANDLE events[WSA_MAXIMUM_WAIT_EVENTS]; SOCKET sockets[WSA_MAXIMUM_WAIT_EVENTS]; int nSocksUsed; CRITICAL_SECTION cs; _THREAD_OBJ *pNext; }THREAD_OBJ,*PTHREAD_OBJ; PTHREAD_OBJ g_pThreadList NULL; CRITICAL_SECTION g_cs; BOOL g_bServerRunning FALSE; HANDLE g_hThreads[1000] {0}; int g_nThreadsCount 0; PTHREAD_OBJ CreateThreadObj() { PTHREAD_OBJ pThread new THREAD_OBJ(); if(pThread ! NULL) { InitializeCriticalSectionAndSpinCount(pThread-cs,4000); pThread-events[0] WSACreateEvent(); pThread-nSocksUsed 1; EnterCriticalSection(g_cs); pThread-pNext g_pThreadList; g_pThreadList pThread; LeaveCriticalSection(g_cs); } return pThread; } void FreeThreadObj(PTHREAD_OBJ pThread) { if(pThread NULL) return; EnterCriticalSection(g_cs); PTHREAD_OBJ p g_pThreadList; if(p pThread) { g_pThreadList p-pNext; } else { while(p ! NULL p-pNext ! pThread) { p p-pNext; } if(p ! NULL) { p-pNext pThread-pNext; } } LeaveCriticalSection(g_cs); DeleteCriticalSection(pThread-cs); WSACloseEvent(pThread-events[0]); delete pThread; } LONG g_nTotalConnections; LONG g_nCurrentConnections; BOOL InsertSocket(PTHREAD_OBJ pThread,SOCKET s) { if(pThread NULL || s INVALID_SOCKET) return FALSE; BOOL bRet FALSE; EnterCriticalSection(pThread-cs); if(pThread-nSocksUsed WSA_MAXIMUM_WAIT_EVENTS) { pThread-events[pThread-nSocksUsed] WSACreateEvent(); pThread-sockets[pThread-nSocksUsed] s; WSAEventSelect(s,pThread-events[pThread-nSocksUsed],FD_READ | FD_CLOSE | FD_WRITE); pThread-nSocksUsed; bRet TRUE; WSASetEvent(pThread-events[0]);//通知线程有新的事件加入了需要重新调用WSAWaitFormultipleEvents } LeaveCriticalSection(pThread-cs); if(bRet) { InterlockedIncrement(g_nTotalConnections); InterlockedIncrement(g_nCurrentConnections); } return bRet; } void RemoveSocket(PTHREAD_OBJ pThread,SOCKET s) { if(pThread NULL || s INVALID_SOCKET) return; EnterCriticalSection(pThread-cs); for(int i1;ipThread-nSocksUsed;i) { if(pThread-sockets[i] s) { WSACloseEvent(pThread-events[i]); closesocket(s); for(int ji;jpThread-nSocksUsed - 1;j) { pThread-events[j] pThread-events[j1]; pThread-sockets[j] pThread-sockets[j1]; } pThread-nSocksUsed--; break; } } LeaveCriticalSection(pThread-cs); InterlockedDecrement(g_nCurrentConnections); } BOOL HandleIo(PTHREAD_OBJ pThread,int nIndex) { WSANETWORKEVENTS event; SOCKET s pThread-sockets[nIndex]; HANDLE sEvent pThread-events[nIndex]; if(0 ! WSAEnumNetworkEvents(s,sEvent,event)) { printf(socket error!\n); RemoveSocket(pThread,s); return FALSE; } do { if(event.lNetworkEvents FD_READ) { if(event.iErrorCode[FD_READ_BIT] 0) { char szText[256]; int nRecv recv(s,szText,strlen(szText),0); if(nRecv 0) { szText[nRecv] \0; printf(接收到数据:%s\n,szText); } else { break; } } else break; } else if(event.lNetworkEvents FD_CLOSE) { break; } else if(event.lNetworkEvents FD_WRITE) { printf(FD_WRITE\n); } return TRUE; } while (FALSE); printf(socket error2!\n); RemoveSocket(pThread,s); return FALSE; } DWORD WINAPI ServerThread(LPVOID lpParam) { PTHREAD_OBJ pThread (PTHREAD_OBJ)lpParam; while(TRUE) { int nIndex WSAWaitForMultipleEvents( pThread-nSocksUsed,pThread-events,FALSE,WSA_INFINITE,FALSE); nIndex nIndex - WSA_WAIT_EVENT_0; if(nIndex WSA_WAIT_FAILED || nIndex WSA_WAIT_TIMEOUT) { printf(WSAWaitForMultipleEvents error!\n); continue; } else if(nIndex 0) { ResetEvent(pThread-events[0]); } else { HandleIo(pThread,nIndex); } if(!g_bServerRunning pThread-nSocksUsed 1) break; } FreeThreadObj(pThread); return 0; } BOOL AssignToFreeThread(SOCKET s) { if(s INVALID_SOCKET) return FALSE; BOOL bAllSucceed TRUE; EnterCriticalSection(g_cs); PTHREAD_OBJ pThread g_pThreadList; while(pThread ! NULL) { if(InsertSocket(pThread,s)) { break; } pThread pThread-pNext; } if(pThread NULL) { if(g_nThreadsCount 1000) { pThread CreateThreadObj(); HANDLE hThread CreateThread(NULL,0,ServerThread,pThread,0,NULL); if(!hThread) { bAllSucceed FALSE; FreeThreadObj(pThread); } else { g_hThreads[g_nThreadsCount] hThread; InsertSocket(pThread,s); } } else bAllSucceed FALSE; } LeaveCriticalSection(g_cs); return bAllSucceed; } DWORD WINAPI ControlThread(LPVOID lpParma) { HANDLE wsaEvent (HANDLE)lpParma; char cmd[128]; while(scanf(%s,cmd)) { if(cmd[0] s) { g_bServerRunning FALSE; EnterCriticalSection(g_cs); PTHREAD_OBJ pThread g_pThreadList; while(pThread ! NULL) { EnterCriticalSection(pThread-cs); for(int i0;ipThread-nSocksUsed;i) { closesocket(pThread-sockets[i]); } WSASetEvent(pThread-events[0]); LeaveCriticalSection(pThread-cs); pThread pThread-pNext; } LeaveCriticalSection(g_cs); WSASetEvent(wsaEvent); break; } } return 0; } int main() { WSAData wsaData; if(0 ! WSAStartup(MAKEWORD(2,2),wsaData)) { printf(初始化失败!%d\n,WSAGetLastError()); Sleep(5000); return -1; } USHORT nport 3456; SOCKET sListen socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); sockaddr_in sin; sin.sin_family AF_INET; sin.sin_port htons(nport); sin.sin_addr.S_un.S_addr ADDR_ANY; if(SOCKET_ERROR bind(sListen,(sockaddr*)sin,sizeof(sin))) { printf(bind failed!%d\n,WSAGetLastError()); Sleep(5000); return -1; } listen(sListen,200); WSAEVENT wsaEvent WSACreateEvent(); WSAEventSelect(sListen,wsaEvent,FD_ACCEPT | FD_CLOSE); InitializeCriticalSectionAndSpinCount(g_cs,4000); g_bServerRunning TRUE; HANDLE hThread CreateThread(NULL,0,ControlThread,wsaEvent,0,NULL); CloseHandle(hThread); while(TRUE) { int nRet WaitForSingleObject(wsaEvent,5*1000); if(!g_bServerRunning) { closesocket(sListen); WSACloseEvent(wsaEvent); WSAWaitForMultipleEvents(g_nThreadsCount,g_hThreads,TRUE,INFINITE,FALSE); for(int i0;ig_nThreadsCount;i) { CloseHandle(g_hThreads[i]); } break; } if(nRet WAIT_FAILED) { printf(WaitForSingleObject Failed!\n); break; } else if(nRet WAIT_TIMEOUT) { printf(\nTotalConnections: %d\nCurrentConnections: %d\nThreads:%d\n, g_nTotalConnections,g_nCurrentConnections,g_nThreadsCount); continue; } else { ResetEvent(wsaEvent); while(TRUE) { sockaddr_in addrRemote; int nLen sizeof(addrRemote); SOCKET sNew accept(sListen,(sockaddr*)addrRemote,nLen); if(sNew SOCKET_ERROR) break; if(!AssignToFreeThread(sNew)) { closesocket(sNew); printf(AssignToFreeThread Failed\n); } } } } DeleteCriticalSection(g_cs); return 0; } #include WinSock2.h
#include Windows.h
#include stdio.h
#include vector
using namespace std;
#pragma comment(lib,Ws2_32.lib)
typedef struct _THREAD_OBJ
{
HANDLE events[WSA_MAXIMUM_WAIT_EVENTS];
SOCKET sockets[WSA_MAXIMUM_WAIT_EVENTS];
int nSocksUsed;
CRITICAL_SECTION cs;
_THREAD_OBJ *pNext;
}THREAD_OBJ,*PTHREAD_OBJ;
PTHREAD_OBJ g_pThreadList NULL;
CRITICAL_SECTION g_cs;
BOOL g_bServerRunning FALSE;
HANDLE g_hThreads[1000] {0};
int g_nThreadsCount 0;
PTHREAD_OBJ CreateThreadObj()
{
PTHREAD_OBJ pThread new THREAD_OBJ();
if(pThread ! NULL)
{
InitializeCriticalSectionAndSpinCount(pThread-cs,4000);
pThread-events[0] WSACreateEvent();
pThread-nSocksUsed 1;
EnterCriticalSection(g_cs);
pThread-pNext g_pThreadList;
g_pThreadList pThread;
LeaveCriticalSection(g_cs);
}
return pThread;
}
void FreeThreadObj(PTHREAD_OBJ pThread)
{
if(pThread NULL)
return;
EnterCriticalSection(g_cs);
PTHREAD_OBJ p g_pThreadList;
if(p pThread)
{
g_pThreadList p-pNext;
}
else
{
while(p ! NULL p-pNext ! pThread)
{
p p-pNext;
}
if(p ! NULL)
{
p-pNext pThread-pNext;
}
}
LeaveCriticalSection(g_cs);
DeleteCriticalSection(pThread-cs);
WSACloseEvent(pThread-events[0]);
delete pThread;
}
LONG g_nTotalConnections;
LONG g_nCurrentConnections;
BOOL InsertSocket(PTHREAD_OBJ pThread,SOCKET s)
{
if(pThread NULL || s INVALID_SOCKET)
return FALSE;
BOOL bRet FALSE;
EnterCriticalSection(pThread-cs);
if(pThread-nSocksUsed WSA_MAXIMUM_WAIT_EVENTS)
{
pThread-events[pThread-nSocksUsed] WSACreateEvent();
pThread-sockets[pThread-nSocksUsed] s;
WSAEventSelect(s,pThread-events[pThread-nSocksUsed],FD_READ | FD_CLOSE | FD_WRITE);
pThread-nSocksUsed;
bRet TRUE;
WSASetEvent(pThread-events[0]);//通知线程有新的事件加入了需要重新调用WSAWaitFormultipleEvents
}
LeaveCriticalSection(pThread-cs);
if(bRet)
{
InterlockedIncrement(g_nTotalConnections);
InterlockedIncrement(g_nCurrentConnections);
}
return bRet;
}
void RemoveSocket(PTHREAD_OBJ pThread,SOCKET s)
{
if(pThread NULL || s INVALID_SOCKET)
return;
EnterCriticalSection(pThread-cs);
for(int i1;ipThread-nSocksUsed;i)
{
if(pThread-sockets[i] s)
{
WSACloseEvent(pThread-events[i]);
closesocket(s);
for(int ji;jpThread-nSocksUsed - 1;j)
{
pThread-events[j] pThread-events[j1];
pThread-sockets[j] pThread-sockets[j1];
}
pThread-nSocksUsed--;
break;
}
}
LeaveCriticalSection(pThread-cs);
InterlockedDecrement(g_nCurrentConnections);
}
BOOL HandleIo(PTHREAD_OBJ pThread,int nIndex)
{
WSANETWORKEVENTS event;
SOCKET s pThread-sockets[nIndex];
HANDLE sEvent pThread-events[nIndex];
if(0 ! WSAEnumNetworkEvents(s,sEvent,event))
{
printf(socket error!\n);
RemoveSocket(pThread,s);
return FALSE;
}
do
{
if(event.lNetworkEvents FD_READ)
{
if(event.iErrorCode[FD_READ_BIT] 0)
{
char szText[256];
int nRecv recv(s,szText,strlen(szText),0);
if(nRecv 0)
{
szText[nRecv] \0;
printf(接收到数据:%s\n,szText);
}
else
{
break;
}
}
else
break;
}
else if(event.lNetworkEvents FD_CLOSE)
{
break;
}
else if(event.lNetworkEvents FD_WRITE)
{
printf(FD_WRITE\n);
}
return TRUE;
} while (FALSE);
printf(socket error2!\n);
RemoveSocket(pThread,s);
return FALSE;
}
DWORD WINAPI ServerThread(LPVOID lpParam)
{
PTHREAD_OBJ pThread (PTHREAD_OBJ)lpParam;
while(TRUE)
{
int nIndex WSAWaitForMultipleEvents(
pThread-nSocksUsed,pThread-events,FALSE,WSA_INFINITE,FALSE);
nIndex nIndex - WSA_WAIT_EVENT_0;
if(nIndex WSA_WAIT_FAILED || nIndex WSA_WAIT_TIMEOUT)
{
printf(WSAWaitForMultipleEvents error!\n);
continue;
}
else if(nIndex 0)
{
ResetEvent(pThread-events[0]);
}
else
{
HandleIo(pThread,nIndex);
}
if(!g_bServerRunning pThread-nSocksUsed 1)
break;
}
FreeThreadObj(pThread);
return 0;
}
BOOL AssignToFreeThread(SOCKET s)
{
if(s INVALID_SOCKET)
return FALSE;
BOOL bAllSucceed TRUE;
EnterCriticalSection(g_cs);
PTHREAD_OBJ pThread g_pThreadList;
while(pThread ! NULL)
{
if(InsertSocket(pThread,s))
{
break;
}
pThread pThread-pNext;
}
if(pThread NULL)
{
if(g_nThreadsCount 1000)
{
pThread CreateThreadObj();
HANDLE hThread CreateThread(NULL,0,ServerThread,pThread,0,NULL);
if(!hThread)
{
bAllSucceed FALSE;
FreeThreadObj(pThread);
}
else
{
g_hThreads[g_nThreadsCount] hThread;
InsertSocket(pThread,s);
}
}
else
bAllSucceed FALSE;
}
LeaveCriticalSection(g_cs);
return bAllSucceed;
}
DWORD WINAPI ControlThread(LPVOID lpParma)
{
HANDLE wsaEvent (HANDLE)lpParma;
char cmd[128];
while(scanf(%s,cmd))
{
if(cmd[0] s)
{
g_bServerRunning FALSE;
EnterCriticalSection(g_cs);
PTHREAD_OBJ pThread g_pThreadList;
while(pThread ! NULL)
{
EnterCriticalSection(pThread-cs);
for(int i0;ipThread-nSocksUsed;i)
{
closesocket(pThread-sockets[i]);
}
WSASetEvent(pThread-events[0]);
LeaveCriticalSection(pThread-cs);
pThread pThread-pNext;
}
LeaveCriticalSection(g_cs);
WSASetEvent(wsaEvent);
break;
}
}
return 0;
}
int main()
{
WSAData wsaData;
if(0 ! WSAStartup(MAKEWORD(2,2),wsaData))
{
printf(初始化失败!%d\n,WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport 3456;
SOCKET sListen socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family AF_INET;
sin.sin_port htons(nport);
sin.sin_addr.S_un.S_addr ADDR_ANY;
if(SOCKET_ERROR bind(sListen,(sockaddr*)sin,sizeof(sin)))
{
printf(bind failed!%d\n,WSAGetLastError());
Sleep(5000);
return -1;
}
listen(sListen,200);
WSAEVENT wsaEvent WSACreateEvent();
WSAEventSelect(sListen,wsaEvent,FD_ACCEPT | FD_CLOSE);
InitializeCriticalSectionAndSpinCount(g_cs,4000);
g_bServerRunning TRUE;
HANDLE hThread CreateThread(NULL,0,ControlThread,wsaEvent,0,NULL);
CloseHandle(hThread);
while(TRUE)
{
int nRet WaitForSingleObject(wsaEvent,5*1000);
if(!g_bServerRunning)
{
closesocket(sListen);
WSACloseEvent(wsaEvent);
WSAWaitForMultipleEvents(g_nThreadsCount,g_hThreads,TRUE,INFINITE,FALSE);
for(int i0;ig_nThreadsCount;i)
{
CloseHandle(g_hThreads[i]);
}
break;
}
if(nRet WAIT_FAILED)
{
printf(WaitForSingleObject Failed!\n);
break;
}
else if(nRet WAIT_TIMEOUT)
{
printf(\nTotalConnections: %d\nCurrentConnections: %d\nThreads:%d\n,
g_nTotalConnections,g_nCurrentConnections,g_nThreadsCount);
continue;
}
else
{
ResetEvent(wsaEvent);
while(TRUE)
{
sockaddr_in addrRemote;
int nLen sizeof(addrRemote);
SOCKET sNew accept(sListen,(sockaddr*)addrRemote,nLen);
if(sNew SOCKET_ERROR)
break;
if(!AssignToFreeThread(sNew))
{
closesocket(sNew);
printf(AssignToFreeThread Failed\n);
}
}
}
}
DeleteCriticalSection(g_cs);
return 0;
} 六重叠I/O模型。
若需要建线程池可参考事件选择模型。若纠结于send可参考下面的IOCP。 [cpp] view plaincopyprint? #include WinSock2.h #include Windows.h #include MSWSock.h #include stdio.h #pragma comment(lib,Ws2_32.lib) #define BUFFER_SIZE 4096 typedef struct _SOCKET_OBJ { SOCKET s; int nOutstandingOps; LPFN_ACCEPTEX lpfnAcceptEx; }SOCKET_OBJ,*PSOCKET_OBJ; PSOCKET_OBJ CreateSocketObj(SOCKET s) { PSOCKET_OBJ pSocket new SOCKET_OBJ(); if(pSocket ! NULL) pSocket-s s; return pSocket; } void FreeSocketObj(PSOCKET_OBJ pSocket) { if(pSocket NULL) return; if(pSocket-s ! INVALID_SOCKET) closesocket(pSocket-s); delete pSocket; } typedef struct _BUFFER_OBJ { OVERLAPPED ol; char* buff; int nLen; PSOCKET_OBJ pSocket; int nOperation; #define OP_ACCEPT 1 #define OP_READ 2 #define OP_WRITE 3 SOCKET sAccept; _BUFFER_OBJ* pNext; }BUFFER_OBJ,*PBUFFER_OBJ; HANDLE g_events[WSA_MAXIMUM_WAIT_EVENTS]; int g_nBufferCount; PBUFFER_OBJ g_pBufferHeader,g_pBufferTail; BOOL g_bServerRunning; CRITICAL_SECTION g_cs; PBUFFER_OBJ CreateBufferObj(PSOCKET_OBJ pSocket,ULONG nLen) { if(g_nBufferCount WSA_MAXIMUM_WAIT_EVENTS - 1) return NULL; PBUFFER_OBJ pBuffer new BUFFER_OBJ(); if(pBuffer ! NULL) { pBuffer-buff new char[nLen]; pBuffer-nLen nLen; pBuffer-ol.hEvent WSACreateEvent(); pBuffer-pSocket pSocket; pBuffer-sAccept INVALID_SOCKET; pBuffer-pNext NULL; EnterCriticalSection(g_cs); if(g_pBufferHeader NULL) { g_pBufferHeader g_pBufferTail pBuffer; } else { g_pBufferTail-pNext pBuffer; g_pBufferTail pBuffer; } LeaveCriticalSection(g_cs); g_events[g_nBufferCount] pBuffer-ol.hEvent; } return pBuffer; } void FreeBufferObj(PBUFFER_OBJ pBuffer) { EnterCriticalSection(g_cs); PBUFFER_OBJ pTest g_pBufferHeader; BOOL bFind FALSE; if(pTest pBuffer) { if(g_pBufferHeader g_pBufferTail) g_pBufferHeader g_pBufferTail NULL; else g_pBufferHeader g_pBufferHeader-pNext; bFind TRUE; } else { while(pTest ! NULL pTest-pNext ! pBuffer) pTest pTest-pNext; if(pTest ! NULL) { pTest-pNext pBuffer-pNext; if(pTest-pNext NULL) g_pBufferTail pTest; bFind TRUE; } } if(bFind) { g_nBufferCount--; WSACloseEvent(pBuffer-ol.hEvent); delete [] pBuffer-buff; delete pBuffer; } LeaveCriticalSection(g_cs); } PBUFFER_OBJ FindBufferObj(HANDLE hEvent) { if(hEvent NULL || hEvent INVALID_HANDLE_VALUE) return NULL; EnterCriticalSection(g_cs); PBUFFER_OBJ pTest g_pBufferHeader; while(pTest ! NULL pTest-ol.hEvent ! hEvent) pTest pTest-pNext; LeaveCriticalSection(g_cs); return pTest; } void RebuildArray() { EnterCriticalSection(g_cs); PBUFFER_OBJ pBuffer g_pBufferHeader; int i1; while(pBuffer ! NULL) { g_events[i] pBuffer-ol.hEvent; pBuffer pBuffer-pNext; } LeaveCriticalSection(g_cs); } BOOL PostAccept(PBUFFER_OBJ pBuffer) { PSOCKET_OBJ pSocket pBuffer-pSocket; if(pSocket-lpfnAcceptEx ! NULL) { pBuffer-nOperation OP_ACCEPT; pSocket-nOutstandingOps; DWORD dwBytes; pBuffer-sAccept WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); BOOL b pSocket-lpfnAcceptEx(pSocket-s, pBuffer-sAccept,pBuffer-buff,BUFFER_SIZE - ((sizeof(sockaddr_in) 16)*2), sizeof(sockaddr_in) 16, sizeof(sockaddr_in) 16,dwBytes,pBuffer-ol); if(!b) { if(WSAGetLastError() ! WSA_IO_PENDING) return FALSE; } return TRUE; } return FALSE; } BOOL PostRecv(PBUFFER_OBJ pBuffer) { pBuffer-nOperation OP_READ; pBuffer-pSocket-nOutstandingOps; DWORD dwBytes; DWORD dwFlags 0; WSABUF buf; buf.buf pBuffer-buff; buf.len pBuffer-nLen; if(WSARecv(pBuffer-pSocket-s,buf,1,dwBytes,dwFlags,pBuffer-ol,NULL)) { if(WSAGetLastError() ! WSA_IO_PENDING) return FALSE; } return TRUE; } BOOL PostSend(PBUFFER_OBJ pBuffer) { pBuffer-nOperation OP_WRITE; pBuffer-pSocket-nOutstandingOps; DWORD dwBytes; DWORD dwFlags 0; WSABUF buf; buf.buf pBuffer-buff; buf.len pBuffer-nLen; if(WSASend(pBuffer-pSocket-s,buf,1,dwBytes,dwFlags,pBuffer-ol,NULL)) { if(WSAGetLastError() ! WSA_IO_PENDING) return FALSE; } return TRUE; } BOOL HandleIo(PBUFFER_OBJ pBuffer) { if(pBuffer NULL) return FALSE; PSOCKET_OBJ pSocket pBuffer-pSocket; pSocket-nOutstandingOps--; DWORD dwTrans; DWORD dwFlags; BOOL bRet WSAGetOverlappedResult(pSocket-s,pBuffer-ol,dwTrans,FALSE,dwFlags); if(!bRet) { if(pSocket-s ! INVALID_SOCKET) { closesocket(pSocket-s); pSocket-s INVALID_SOCKET; } if(pBuffer-nOperation OP_ACCEPT pBuffer-sAccept ! INVALID_SOCKET) { closesocket(pBuffer-sAccept); pBuffer-sAccept INVALID_SOCKET; } if(pSocket-nOutstandingOps 0) { FreeSocketObj(pSocket); } FreeBufferObj(pBuffer); return FALSE; } switch(pBuffer-nOperation) { case OP_ACCEPT: { if(dwTrans 0) { pBuffer-buff[dwTrans] 0; printf(Accept收到数据%s\n,pBuffer-buff); PSOCKET_OBJ pClient CreateSocketObj(pBuffer-sAccept); PBUFFER_OBJ pRecv CreateBufferObj(pClient,BUFFER_SIZE); if(pRecv NULL) { printf(Too much connections!\n); FreeSocketObj(pClient); return FALSE; } RebuildArray(); if(!PostRecv(pRecv)) { FreeSocketObj(pClient); FreeBufferObj(pBuffer); return FALSE; } } else { if(pSocket-s ! INVALID_SOCKET) { closesocket(pSocket-s); pSocket-s INVALID_SOCKET; } if(pBuffer-sAccept ! INVALID_SOCKET) { closesocket(pBuffer-sAccept); pBuffer-sAccept INVALID_SOCKET; } if(pSocket-nOutstandingOps 0) { FreeSocketObj(pSocket); } FreeBufferObj(pBuffer); } // PBUFFER_OBJ pSend CreateBufferObj(pClient,BUFFER_SIZE); //if(pSend NULL) //{ // printf(Too much connections!\n); // FreeSocketObj(pClient); // return FALSE; //} //RebuildArray(); //pSend-nLen dwTrans; //memcpy(pSend-buff,pBuffer-buff,dwTrans); //if(!PostSend(pSend)) //{ // FreeSocketObj(pSocket); // FreeBufferObj(pBuffer); // return FALSE; //} PostAccept(pBuffer); }break; case OP_READ: { if(dwTrans 0) { pBuffer-buff[dwTrans] 0; printf(Recv收到数据%s\n,pBuffer-buff); PostRecv(pBuffer); } else { if(pSocket-s ! INVALID_SOCKET) { closesocket(pSocket-s); pSocket-s INVALID_SOCKET; } if(pSocket-nOutstandingOps 0) { FreeSocketObj(pSocket); } FreeBufferObj(pBuffer); } }break; case OP_WRITE: { if(dwTrans 0) { pBuffer-buff[dwTrans] 0; printf(发送数据: %s 成功\n,pBuffer-buff); FreeBufferObj(pBuffer); } else { if(pSocket-s ! INVALID_SOCKET) { closesocket(pSocket-s); pSocket-s INVALID_SOCKET; } if(pSocket-nOutstandingOps 0) { FreeSocketObj(pSocket); } FreeBufferObj(pBuffer); } }break; } } DWORD WINAPI ControlThread(LPVOID lpParma) { char cmd[128]; while(scanf(%s,cmd)) { if(cmd[0] s) { g_bServerRunning FALSE; EnterCriticalSection(g_cs); PBUFFER_OBJ pBuffer g_pBufferHeader; while(pBuffer ! NULL) { if(pBuffer-pSocket ! NULL pBuffer-pSocket-s ! INVALID_SOCKET) closesocket(pBuffer-pSocket-s); pBuffer pBuffer-pNext; } LeaveCriticalSection(g_cs); break; } } return 0; } int main() { InitializeCriticalSectionAndSpinCount(g_cs,4000); WSAData wsaData; if(0 ! WSAStartup(MAKEWORD(2,2),wsaData)) { printf(初始化失败!%d\n,WSAGetLastError()); Sleep(5000); return -1; } USHORT nport 3456; SOCKET sListen socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); sockaddr_in sin; sin.sin_family AF_INET; sin.sin_port htons(nport); sin.sin_addr.S_un.S_addr ADDR_ANY; if(SOCKET_ERROR bind(sListen,(sockaddr*)sin,sizeof(sin))) { printf(bind failed!%d\n,WSAGetLastError()); Sleep(5000); return -1; } listen(sListen,200); g_bServerRunning TRUE; PSOCKET_OBJ pListen CreateSocketObj(sListen); GUID GuidAcceptEx WSAID_ACCEPTEX; DWORD dwBytes; WSAIoctl(pListen-s, SIO_GET_EXTENSION_FUNCTION_POINTER, GuidAcceptEx, sizeof(GuidAcceptEx), pListen-lpfnAcceptEx, sizeof(pListen-lpfnAcceptEx), dwBytes, NULL, NULL); g_events[0] WSACreateEvent(); for(int i0;i5;i) { PostAccept(CreateBufferObj(pListen,BUFFER_SIZE)); } HANDLE hThread CreateThread(NULL,0,ControlThread,NULL,0,NULL); while(TRUE) { int nIndex WSAWaitForMultipleEvents(g_nBufferCount1,g_events,FALSE,WSA_INFINITE,FALSE); if(nIndex WSA_WAIT_FAILED) { printf(WSAWaitForMultipleEvents Failed!\n); break; } nIndex nIndex - WSA_WAIT_EVENT_0; for(int inIndex;i g_nBufferCount;i) { int nRet WSAWaitForMultipleEvents(1,g_events[i],FALSE,0,FALSE); if(nRet WSA_WAIT_TIMEOUT) continue; if(i 0) { RebuildArray(); continue; } PBUFFER_OBJ pBuffer FindBufferObj(g_events[i]); if(pBuffer ! NULL) { if(!HandleIo(pBuffer)) RebuildArray(); } } if(!g_bServerRunning g_nBufferCount 0) break; } WSACloseEvent(g_events[0]); WaitForSingleObject(hThread,INFINITE); CloseHandle(hThread); closesocket(sListen); WSACleanup(); DeleteCriticalSection(g_cs); return 0; } #include WinSock2.h
#include Windows.h
#include MSWSock.h
#include stdio.h
#pragma comment(lib,Ws2_32.lib)
#define BUFFER_SIZE 4096
typedef struct _SOCKET_OBJ
{
SOCKET s;
int nOutstandingOps;
LPFN_ACCEPTEX lpfnAcceptEx;
}SOCKET_OBJ,*PSOCKET_OBJ;
PSOCKET_OBJ CreateSocketObj(SOCKET s)
{
PSOCKET_OBJ pSocket new SOCKET_OBJ();
if(pSocket ! NULL)
pSocket-s s;
return pSocket;
}
void FreeSocketObj(PSOCKET_OBJ pSocket)
{
if(pSocket NULL)
return;
if(pSocket-s ! INVALID_SOCKET)
closesocket(pSocket-s);
delete pSocket;
}
typedef struct _BUFFER_OBJ
{
OVERLAPPED ol;
char* buff;
int nLen;
PSOCKET_OBJ pSocket;
int nOperation;
#define OP_ACCEPT 1
#define OP_READ 2
#define OP_WRITE 3
SOCKET sAccept;
_BUFFER_OBJ* pNext;
}BUFFER_OBJ,*PBUFFER_OBJ;
HANDLE g_events[WSA_MAXIMUM_WAIT_EVENTS];
int g_nBufferCount;
PBUFFER_OBJ g_pBufferHeader,g_pBufferTail;
BOOL g_bServerRunning;
CRITICAL_SECTION g_cs;
PBUFFER_OBJ CreateBufferObj(PSOCKET_OBJ pSocket,ULONG nLen)
{
if(g_nBufferCount WSA_MAXIMUM_WAIT_EVENTS - 1)
return NULL;
PBUFFER_OBJ pBuffer new BUFFER_OBJ();
if(pBuffer ! NULL)
{
pBuffer-buff new char[nLen];
pBuffer-nLen nLen;
pBuffer-ol.hEvent WSACreateEvent();
pBuffer-pSocket pSocket;
pBuffer-sAccept INVALID_SOCKET;
pBuffer-pNext NULL;
EnterCriticalSection(g_cs);
if(g_pBufferHeader NULL)
{
g_pBufferHeader g_pBufferTail pBuffer;
}
else
{
g_pBufferTail-pNext pBuffer;
g_pBufferTail pBuffer;
}
LeaveCriticalSection(g_cs);
g_events[g_nBufferCount] pBuffer-ol.hEvent;
}
return pBuffer;
}
void FreeBufferObj(PBUFFER_OBJ pBuffer)
{
EnterCriticalSection(g_cs);
PBUFFER_OBJ pTest g_pBufferHeader;
BOOL bFind FALSE;
if(pTest pBuffer)
{
if(g_pBufferHeader g_pBufferTail)
g_pBufferHeader g_pBufferTail NULL;
else
g_pBufferHeader g_pBufferHeader-pNext;
bFind TRUE;
}
else
{
while(pTest ! NULL pTest-pNext ! pBuffer)
pTest pTest-pNext;
if(pTest ! NULL)
{
pTest-pNext pBuffer-pNext;
if(pTest-pNext NULL)
g_pBufferTail pTest;
bFind TRUE;
}
}
if(bFind)
{
g_nBufferCount--;
WSACloseEvent(pBuffer-ol.hEvent);
delete [] pBuffer-buff;
delete pBuffer;
}
LeaveCriticalSection(g_cs);
}
PBUFFER_OBJ FindBufferObj(HANDLE hEvent)
{
if(hEvent NULL || hEvent INVALID_HANDLE_VALUE)
return NULL;
EnterCriticalSection(g_cs);
PBUFFER_OBJ pTest g_pBufferHeader;
while(pTest ! NULL pTest-ol.hEvent ! hEvent)
pTest pTest-pNext;
LeaveCriticalSection(g_cs);
return pTest;
}
void RebuildArray()
{
EnterCriticalSection(g_cs);
PBUFFER_OBJ pBuffer g_pBufferHeader;
int i1;
while(pBuffer ! NULL)
{
g_events[i] pBuffer-ol.hEvent;
pBuffer pBuffer-pNext;
}
LeaveCriticalSection(g_cs);
}
BOOL PostAccept(PBUFFER_OBJ pBuffer)
{
PSOCKET_OBJ pSocket pBuffer-pSocket;
if(pSocket-lpfnAcceptEx ! NULL)
{
pBuffer-nOperation OP_ACCEPT;
pSocket-nOutstandingOps;
DWORD dwBytes;
pBuffer-sAccept WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
BOOL b pSocket-lpfnAcceptEx(pSocket-s,
pBuffer-sAccept,pBuffer-buff,BUFFER_SIZE - ((sizeof(sockaddr_in) 16)*2),
sizeof(sockaddr_in) 16, sizeof(sockaddr_in) 16,dwBytes,pBuffer-ol);
if(!b)
{
if(WSAGetLastError() ! WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}
return FALSE;
}
BOOL PostRecv(PBUFFER_OBJ pBuffer)
{
pBuffer-nOperation OP_READ;
pBuffer-pSocket-nOutstandingOps;
DWORD dwBytes;
DWORD dwFlags 0;
WSABUF buf;
buf.buf pBuffer-buff;
buf.len pBuffer-nLen;
if(WSARecv(pBuffer-pSocket-s,buf,1,dwBytes,dwFlags,pBuffer-ol,NULL))
{
if(WSAGetLastError() ! WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}
BOOL PostSend(PBUFFER_OBJ pBuffer)
{
pBuffer-nOperation OP_WRITE;
pBuffer-pSocket-nOutstandingOps;
DWORD dwBytes;
DWORD dwFlags 0;
WSABUF buf;
buf.buf pBuffer-buff;
buf.len pBuffer-nLen;
if(WSASend(pBuffer-pSocket-s,buf,1,dwBytes,dwFlags,pBuffer-ol,NULL))
{
if(WSAGetLastError() ! WSA_IO_PENDING)
return FALSE;
}
return TRUE;
}
BOOL HandleIo(PBUFFER_OBJ pBuffer)
{
if(pBuffer NULL)
return FALSE;
PSOCKET_OBJ pSocket pBuffer-pSocket;
pSocket-nOutstandingOps--;
DWORD dwTrans;
DWORD dwFlags;
BOOL bRet WSAGetOverlappedResult(pSocket-s,pBuffer-ol,dwTrans,FALSE,dwFlags);
if(!bRet)
{
if(pSocket-s ! INVALID_SOCKET)
{
closesocket(pSocket-s);
pSocket-s INVALID_SOCKET;
}
if(pBuffer-nOperation OP_ACCEPT pBuffer-sAccept ! INVALID_SOCKET)
{
closesocket(pBuffer-sAccept);
pBuffer-sAccept INVALID_SOCKET;
}
if(pSocket-nOutstandingOps 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
return FALSE;
}
switch(pBuffer-nOperation)
{
case OP_ACCEPT:
{
if(dwTrans 0)
{
pBuffer-buff[dwTrans] 0;
printf(Accept收到数据%s\n,pBuffer-buff);
PSOCKET_OBJ pClient CreateSocketObj(pBuffer-sAccept);
PBUFFER_OBJ pRecv CreateBufferObj(pClient,BUFFER_SIZE);
if(pRecv NULL)
{
printf(Too much connections!\n);
FreeSocketObj(pClient);
return FALSE;
}
RebuildArray();
if(!PostRecv(pRecv))
{
FreeSocketObj(pClient);
FreeBufferObj(pBuffer);
return FALSE;
}
}
else
{
if(pSocket-s ! INVALID_SOCKET)
{
closesocket(pSocket-s);
pSocket-s INVALID_SOCKET;
}
if(pBuffer-sAccept ! INVALID_SOCKET)
{
closesocket(pBuffer-sAccept);
pBuffer-sAccept INVALID_SOCKET;
}
if(pSocket-nOutstandingOps 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
// PBUFFER_OBJ pSend CreateBufferObj(pClient,BUFFER_SIZE);
//if(pSend NULL)
//{
// printf(Too much connections!\n);
// FreeSocketObj(pClient);
// return FALSE;
//}
//RebuildArray();
//pSend-nLen dwTrans;
//memcpy(pSend-buff,pBuffer-buff,dwTrans);
//if(!PostSend(pSend))
//{
// FreeSocketObj(pSocket);
// FreeBufferObj(pBuffer);
// return FALSE;
//}
PostAccept(pBuffer);
}break;
case OP_READ:
{
if(dwTrans 0)
{
pBuffer-buff[dwTrans] 0;
printf(Recv收到数据%s\n,pBuffer-buff);
PostRecv(pBuffer);
}
else
{
if(pSocket-s ! INVALID_SOCKET)
{
closesocket(pSocket-s);
pSocket-s INVALID_SOCKET;
}
if(pSocket-nOutstandingOps 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
}break;
case OP_WRITE:
{
if(dwTrans 0)
{
pBuffer-buff[dwTrans] 0;
printf(发送数据: %s 成功\n,pBuffer-buff);
FreeBufferObj(pBuffer);
}
else
{
if(pSocket-s ! INVALID_SOCKET)
{
closesocket(pSocket-s);
pSocket-s INVALID_SOCKET;
}
if(pSocket-nOutstandingOps 0)
{
FreeSocketObj(pSocket);
}
FreeBufferObj(pBuffer);
}
}break;
}
}
DWORD WINAPI ControlThread(LPVOID lpParma)
{
char cmd[128];
while(scanf(%s,cmd))
{
if(cmd[0] s)
{
g_bServerRunning FALSE;
EnterCriticalSection(g_cs);
PBUFFER_OBJ pBuffer g_pBufferHeader;
while(pBuffer ! NULL)
{
if(pBuffer-pSocket ! NULL pBuffer-pSocket-s ! INVALID_SOCKET)
closesocket(pBuffer-pSocket-s);
pBuffer pBuffer-pNext;
}
LeaveCriticalSection(g_cs);
break;
}
}
return 0;
}
int main()
{
InitializeCriticalSectionAndSpinCount(g_cs,4000);
WSAData wsaData;
if(0 ! WSAStartup(MAKEWORD(2,2),wsaData))
{
printf(初始化失败!%d\n,WSAGetLastError());
Sleep(5000);
return -1;
}
USHORT nport 3456;
SOCKET sListen socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
sockaddr_in sin;
sin.sin_family AF_INET;
sin.sin_port htons(nport);
sin.sin_addr.S_un.S_addr ADDR_ANY;
if(SOCKET_ERROR bind(sListen,(sockaddr*)sin,sizeof(sin)))
{
printf(bind failed!%d\n,WSAGetLastError());
Sleep(5000);
return -1;
}
listen(sListen,200);
g_bServerRunning TRUE;
PSOCKET_OBJ pListen CreateSocketObj(sListen);
GUID GuidAcceptEx WSAID_ACCEPTEX;
DWORD dwBytes;
WSAIoctl(pListen-s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
GuidAcceptEx,
sizeof(GuidAcceptEx),
pListen-lpfnAcceptEx,
sizeof(pListen-lpfnAcceptEx),
dwBytes,
NULL,
NULL);
g_events[0] WSACreateEvent();
for(int i0;i5;i)
{
PostAccept(CreateBufferObj(pListen,BUFFER_SIZE));
}
HANDLE hThread CreateThread(NULL,0,ControlThread,NULL,0,NULL);
while(TRUE)
{
int nIndex WSAWaitForMultipleEvents(g_nBufferCount1,g_events,FALSE,WSA_INFINITE,FALSE);
if(nIndex WSA_WAIT_FAILED)
{
printf(WSAWaitForMultipleEvents Failed!\n);
break;
}
nIndex nIndex - WSA_WAIT_EVENT_0;
for(int inIndex;i g_nBufferCount;i)
{
int nRet WSAWaitForMultipleEvents(1,g_events[i],FALSE,0,FALSE);
if(nRet WSA_WAIT_TIMEOUT)
continue;
if(i 0)
{
RebuildArray();
continue;
}
PBUFFER_OBJ pBuffer FindBufferObj(g_events[i]);
if(pBuffer ! NULL)
{
if(!HandleIo(pBuffer))
RebuildArray();
}
}
if(!g_bServerRunning g_nBufferCount 0)
break;
}
WSACloseEvent(g_events[0]);
WaitForSingleObject(hThread,INFINITE);
CloseHandle(hThread);
closesocket(sListen);
WSACleanup();
DeleteCriticalSection(g_cs);
return 0;
}七IOCP。
大框架为书中例子对强化了发送操作部分异常处理且加入了连接超时处理。
注意当一个投递完成且对应socket上已经没有未决的投递必须要再投递一个请求或者关闭连接否则socket对应的数据结构无法被释放对应socket连接断开时也无法被
检测到。所以如果业务逻辑结束要关闭连接。或者你需要等客户端来断开连接那么你可以在业务逻辑结束后再投递一个接收请求客户端断开时接收请求返回且接收的字节数为0则此类中的异常处理逻辑便会将资源清理掉。 头文件 [cpp] view plaincopyprint? // IOCP.h文件 #ifndef __IOCP_H__ #define __IOCP_H__ #include winsock2.h #include windows.h #include Mswsock.h #define BUFFER_SIZE 1024*4 // I/O请求的缓冲区大小 #define MAX_THREAD 1 // I/O服务线程的数量 // 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息 struct CIOCPBuffer { CIOCPBuffer() { memset(ol,0,sizeof(WSAOVERLAPPED)); sClient INVALID_SOCKET; memset(buff,0,BUFFER_SIZE); nLen 0; nSequenceNumber 0; bIsReleased FALSE; nOperation 0; pNext NULL; } WSAOVERLAPPED ol; SOCKET sClient; // AcceptEx接收的客户方套节字 char buff[BUFFER_SIZE]; // I/O操作使用的缓冲区 int nLen; // buff缓冲区使用的大小 ULONG nSequenceNumber; // 此I/O的序列号 BOOL bIsReleased; int nOperation; // 操作类型 #define OP_ACCEPT 1 #define OP_WRITE 2 #define OP_READ 3 CIOCPBuffer *pNext; }; struct CIOCPNextToSend; struct CIOCPTimerData; // 这是per-Handle数据。它包含了一个套节字的信息 struct CIOCPContext { CIOCPContext() { s INVALID_SOCKET; memset(addrLocal,0,sizeof(SOCKADDR_IN)); memset(addrRemote,0,sizeof(SOCKADDR_IN)); bClosing FALSE; nOutstandingRecv 0; nOutstandingSend 0; nReadSequence 0; nCurrentReadSequence 0; nCurrentStep 0; pOutOfOrderReads NULL; pNextToSend NULL; bIsReleased FALSE; pNext NULL; pPreData NULL; strcpy(szClientName,); hTimer NULL; hCompletion NULL; } CIOCPBuffer m_pBuffer; SOCKET s; // 套节字句柄 SOCKADDR_IN addrLocal; // 连接的本地地址 SOCKADDR_IN addrRemote; // 连接的远程地址 BOOL bClosing; // 套节字是否关闭 int nOutstandingRecv; // 此套节字上抛出的重叠操作的数量 int nOutstandingSend; ULONG nReadSequence; // 安排给接收的下一个序列号 ULONG nCurrentReadSequence; // 当前要读的序列号 CIOCPBuffer *pOutOfOrderReads; // 记录没有按顺序完成的读I/O CIOCPNextToSend *pNextToSend; //xss,按顺序发送的下一个要发送的。 LPVOID pPreData; //xss用于2个过程之间的数据交流。 ULONG nCurrentStep;//xss,用于记录当前处于的过程步骤数。 BOOL bIsReleased; CRITICAL_SECTION Lock; // 保护这个结构 CIOCPContext *pNext; char szClientName[256];//xss HANDLE hTimer;//xss HANDLE hCompletion;//xss }; struct CIOCPNextToSend//xss { CIOCPBuffer * pBuffer; CIOCPNextToSend * pNext; }; struct CIOCPTimerData { CIOCPContext* pContext; HANDLE hCompletion; }; class CIOCPServer // 处理线程 { public: CIOCPServer(); ~CIOCPServer(); // 开始服务 BOOL Start(int nPort 3456, int nMaxConnections 2000, int nMaxFreeBuffers 200, int nMaxFreeContexts 100, int nInitialReads 4); // 停止服务 void Shutdown(); // 关闭一个连接和关闭所有连接 void CloseAConnection(CIOCPContext *pContext); void CloseAllConnections(); // 取得当前的连接数量 ULONG GetCurrentConnection() { return m_nCurrentConnection; } // 向指定客户发送文本 BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen); protected: // 申请和释放缓冲区对象 CIOCPBuffer *AllocateBuffer(int nLen); void ReleaseBuffer(CIOCPBuffer *pBuffer); // 申请和释放套节字上下文 CIOCPContext *AllocateContext(SOCKET s); void ReleaseContext(CIOCPContext *pContext); // 释放空闲缓冲区对象列表和空闲上下文对象列表 void FreeBuffers(); void FreeContexts(); // 向连接列表中添加一个连接 BOOL AddAConnection(CIOCPContext *pContext); // 插入和移除未决的接受请求 BOOL InsertPendingAccept(CIOCPBuffer *pBuffer); BOOL RemovePendingAccept(CIOCPBuffer *pBuffer); //xss,把要发送的数据加入队列按顺序发送 BOOL PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer); //xss,发送下一个需要发送的 BOOL PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer); // 取得下一个要读取的 CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer); void ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer);//xss,错误集中处理 // 投递接受I/O、发送I/O、接收I/O BOOL PostAccept(CIOCPBuffer *pBuffer); BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer); BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer); BOOL PostRecv2(CIOCPContext *pContext, CIOCPBuffer *pBuffer); void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError); // 事件通知函数 // 建立了一个新的连接 virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer); // 一个连接关闭 virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer); // 在一个连接上发生了错误 virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError); // 一个连接上的读操作完成 virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer); // 一个连接上的写操作完成 virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer); protected: // 记录空闲结构信息 CIOCPBuffer *m_pFreeBufferList; CIOCPContext *m_pFreeContextList; int m_nFreeBufferCount; int m_nFreeContextCount; CRITICAL_SECTION m_FreeBufferListLock; CRITICAL_SECTION m_FreeContextListLock; CRITICAL_SECTION m_HeapLock; CRITICAL_SECTION m_RepostLock; // 记录抛出的Accept请求 CIOCPBuffer *m_pPendingAccepts; // 抛出请求列表。 long m_nPendingAcceptCount; CRITICAL_SECTION m_PendingAcceptsLock; // 记录连接列表 CIOCPContext *m_pConnectionList; int m_nCurrentConnection; CRITICAL_SECTION m_ConnectionListLock; // 用于投递Accept请求 HANDLE m_hAcceptEvent; HANDLE m_hRepostEvent; LONG m_nRepostCount; int m_nPort; // 服务器监听的端口 int m_nInitialAccepts; int m_nInitialReads; int m_nMaxAccepts; int m_nMaxSends; int m_nMaxFreeBuffers; int m_nMaxFreeContexts; int m_nMaxConnections; HANDLE m_hListenThread; // 监听线程 HANDLE m_hCompletion; // 完成端口句柄 SOCKET m_sListen; // 监听套节字句柄 LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx函数地址 LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址 BOOL m_bShutDown; // 用于通知监听线程退出 BOOL m_bServerStarted; // 记录服务是否启动 HANDLE m_hTimerQueue;//xss private: // 线程函数 static DWORD WINAPI _ListenThreadProc(LPVOID lpParam); static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam); }; #endif // __IOCP_H__ // IOCP.h文件
#ifndef __IOCP_H__
#define __IOCP_H__
#include winsock2.h
#include windows.h
#include Mswsock.h
#define BUFFER_SIZE 1024*4 // I/O请求的缓冲区大小
#define MAX_THREAD 1 // I/O服务线程的数量
// 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息
struct CIOCPBuffer
{
CIOCPBuffer()
{
memset(ol,0,sizeof(WSAOVERLAPPED));
sClient INVALID_SOCKET;
memset(buff,0,BUFFER_SIZE);
nLen 0;
nSequenceNumber 0;
bIsReleased FALSE;
nOperation 0;
pNext NULL;
}
WSAOVERLAPPED ol;
SOCKET sClient; // AcceptEx接收的客户方套节字
char buff[BUFFER_SIZE]; // I/O操作使用的缓冲区
int nLen; // buff缓冲区使用的大小
ULONG nSequenceNumber; // 此I/O的序列号
BOOL bIsReleased;
int nOperation; // 操作类型
#define OP_ACCEPT 1
#define OP_WRITE 2
#define OP_READ 3
CIOCPBuffer *pNext;
};
struct CIOCPNextToSend;
struct CIOCPTimerData;
// 这是per-Handle数据。它包含了一个套节字的信息
struct CIOCPContext
{
CIOCPContext()
{
s INVALID_SOCKET;
memset(addrLocal,0,sizeof(SOCKADDR_IN));
memset(addrRemote,0,sizeof(SOCKADDR_IN));
bClosing FALSE;
nOutstandingRecv 0;
nOutstandingSend 0;
nReadSequence 0;
nCurrentReadSequence 0;
nCurrentStep 0;
pOutOfOrderReads NULL;
pNextToSend NULL;
bIsReleased FALSE;
pNext NULL;
pPreData NULL;
strcpy(szClientName,);
hTimer NULL;
hCompletion NULL;
}
CIOCPBuffer m_pBuffer;
SOCKET s; // 套节字句柄
SOCKADDR_IN addrLocal; // 连接的本地地址
SOCKADDR_IN addrRemote; // 连接的远程地址
BOOL bClosing; // 套节字是否关闭
int nOutstandingRecv; // 此套节字上抛出的重叠操作的数量
int nOutstandingSend;
ULONG nReadSequence; // 安排给接收的下一个序列号
ULONG nCurrentReadSequence; // 当前要读的序列号
CIOCPBuffer *pOutOfOrderReads; // 记录没有按顺序完成的读I/O
CIOCPNextToSend *pNextToSend; //xss,按顺序发送的下一个要发送的。
LPVOID pPreData; //xss用于2个过程之间的数据交流。
ULONG nCurrentStep;//xss,用于记录当前处于的过程步骤数。
BOOL bIsReleased;
CRITICAL_SECTION Lock; // 保护这个结构
CIOCPContext *pNext;
char szClientName[256];//xss
HANDLE hTimer;//xss
HANDLE hCompletion;//xss
};
struct CIOCPNextToSend//xss
{
CIOCPBuffer * pBuffer;
CIOCPNextToSend * pNext;
};
struct CIOCPTimerData
{
CIOCPContext* pContext;
HANDLE hCompletion;
};
class CIOCPServer // 处理线程
{
public:
CIOCPServer();
~CIOCPServer();
// 开始服务
BOOL Start(int nPort 3456, int nMaxConnections 2000,
int nMaxFreeBuffers 200, int nMaxFreeContexts 100, int nInitialReads 4);
// 停止服务
void Shutdown();
// 关闭一个连接和关闭所有连接
void CloseAConnection(CIOCPContext *pContext);
void CloseAllConnections();
// 取得当前的连接数量
ULONG GetCurrentConnection() { return m_nCurrentConnection; }
// 向指定客户发送文本
BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen);
protected:
// 申请和释放缓冲区对象
CIOCPBuffer *AllocateBuffer(int nLen);
void ReleaseBuffer(CIOCPBuffer *pBuffer);
// 申请和释放套节字上下文
CIOCPContext *AllocateContext(SOCKET s);
void ReleaseContext(CIOCPContext *pContext);
// 释放空闲缓冲区对象列表和空闲上下文对象列表
void FreeBuffers();
void FreeContexts();
// 向连接列表中添加一个连接
BOOL AddAConnection(CIOCPContext *pContext);
// 插入和移除未决的接受请求
BOOL InsertPendingAccept(CIOCPBuffer *pBuffer);
BOOL RemovePendingAccept(CIOCPBuffer *pBuffer);
//xss,把要发送的数据加入队列按顺序发送
BOOL PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
//xss,发送下一个需要发送的
BOOL PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 取得下一个要读取的
CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
void ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer);//xss,错误集中处理
// 投递接受I/O、发送I/O、接收I/O
BOOL PostAccept(CIOCPBuffer *pBuffer);
BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
BOOL PostRecv2(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError);
// 事件通知函数
// 建立了一个新的连接
virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 一个连接关闭
virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 在一个连接上发生了错误
virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);
// 一个连接上的读操作完成
virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
// 一个连接上的写操作完成
virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
protected:
// 记录空闲结构信息
CIOCPBuffer *m_pFreeBufferList;
CIOCPContext *m_pFreeContextList;
int m_nFreeBufferCount;
int m_nFreeContextCount;
CRITICAL_SECTION m_FreeBufferListLock;
CRITICAL_SECTION m_FreeContextListLock;
CRITICAL_SECTION m_HeapLock;
CRITICAL_SECTION m_RepostLock;
// 记录抛出的Accept请求
CIOCPBuffer *m_pPendingAccepts; // 抛出请求列表。
long m_nPendingAcceptCount;
CRITICAL_SECTION m_PendingAcceptsLock;
// 记录连接列表
CIOCPContext *m_pConnectionList;
int m_nCurrentConnection;
CRITICAL_SECTION m_ConnectionListLock;
// 用于投递Accept请求
HANDLE m_hAcceptEvent;
HANDLE m_hRepostEvent;
LONG m_nRepostCount;
int m_nPort; // 服务器监听的端口
int m_nInitialAccepts;
int m_nInitialReads;
int m_nMaxAccepts;
int m_nMaxSends;
int m_nMaxFreeBuffers;
int m_nMaxFreeContexts;
int m_nMaxConnections;
HANDLE m_hListenThread; // 监听线程
HANDLE m_hCompletion; // 完成端口句柄
SOCKET m_sListen; // 监听套节字句柄
LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx函数地址
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址
BOOL m_bShutDown; // 用于通知监听线程退出
BOOL m_bServerStarted; // 记录服务是否启动
HANDLE m_hTimerQueue;//xss
private: // 线程函数
static DWORD WINAPI _ListenThreadProc(LPVOID lpParam);
static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam);
};
#endif // __IOCP_H__cpp文件 [cpp] view plaincopyprint? // // IOCP.cpp文件 #define _WIN32_WINNT 0x0500 //xss #include iocp.h #pragma comment(lib, WS2_32.lib) #include stdio.h #include httpFun.h static int iBufferCount 0; static int iContextCount 0; CIOCPServer::CIOCPServer() { // 列表 m_pFreeBufferList NULL; m_pFreeContextList NULL; m_pPendingAccepts NULL; m_pConnectionList NULL; m_nFreeBufferCount 0; m_nFreeContextCount 0; m_nPendingAcceptCount 0; m_nCurrentConnection 0; ::InitializeCriticalSection(m_FreeBufferListLock); ::InitializeCriticalSection(m_FreeContextListLock); ::InitializeCriticalSection(m_PendingAcceptsLock); ::InitializeCriticalSection(m_ConnectionListLock); ::InitializeCriticalSection(m_HeapLock); ::InitializeCriticalSection(m_RepostLock); // Accept请求 m_hAcceptEvent ::CreateEvent(NULL, FALSE, FALSE, NULL); m_hRepostEvent ::CreateEvent(NULL, FALSE, FALSE, NULL); m_nRepostCount 0; m_nPort 8888; m_nInitialAccepts 10; m_nInitialReads 4; m_nMaxAccepts 100; m_nMaxSends 20; m_nMaxFreeBuffers 200; m_nMaxFreeContexts 100; m_nMaxConnections 2000; m_hListenThread NULL; m_hCompletion NULL; m_sListen INVALID_SOCKET; m_lpfnAcceptEx NULL; m_lpfnGetAcceptExSockaddrs NULL; m_bShutDown FALSE; m_bServerStarted FALSE; m_hTimerQueue ::CreateTimerQueue(); // 初始化WS2_32.dll WSADATA wsaData; WORD sockVersion MAKEWORD(2, 2); ::WSAStartup(sockVersion, wsaData); } CIOCPServer::~CIOCPServer() { Shutdown(); if(m_sListen ! INVALID_SOCKET) ::closesocket(m_sListen); if(m_hListenThread ! NULL) ::CloseHandle(m_hListenThread); ::CloseHandle(m_hRepostEvent); ::CloseHandle(m_hAcceptEvent); ::DeleteCriticalSection(m_FreeBufferListLock); ::DeleteCriticalSection(m_FreeContextListLock); ::DeleteCriticalSection(m_PendingAcceptsLock); ::DeleteCriticalSection(m_ConnectionListLock); ::DeleteCriticalSection(m_HeapLock); ::DeleteCriticalSection(m_RepostLock); ::DeleteTimerQueue(m_hTimerQueue);//xss ::WSACleanup(); } /// static VOID CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired) { CIOCPContext* pContext (CIOCPContext*)lpParam; if(pContext ! NULL pContext-bClosing FALSE) { EnterCriticalSection(pContext-Lock); if(pContext-hCompletion ! NULL) { PostQueuedCompletionStatus(pContext-hCompletion,-2,(ULONG_PTR)pContext,NULL); } LeaveCriticalSection(pContext-Lock); } } /// // 自定义帮助函数 CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen) { CIOCPBuffer *pBuffer NULL; if(nLen BUFFER_SIZE) return NULL; // 为缓冲区对象申请内存 ::EnterCriticalSection(m_FreeBufferListLock); if(m_pFreeBufferList NULL) // 内存池为空申请新的内存 { // pBuffer (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(), // HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) BUFFER_SIZE); pBuffer new CIOCPBuffer(); } else // 从内存池中取一块来使用 { pBuffer m_pFreeBufferList; m_pFreeBufferList m_pFreeBufferList-pNext; pBuffer-pNext NULL; m_nFreeBufferCount --; } ::LeaveCriticalSection(m_FreeBufferListLock); EnterCriticalSection(m_HeapLock); iBufferCount; LeaveCriticalSection(m_HeapLock); // 初始化新的缓冲区对象 if(pBuffer ! NULL) { //pBuffer-buff (char*)(pBuffer sizeof(CIOCPBuffer)/*1*/);//xss,个人以为应该sizeof(CIOCPBuffer); pBuffer-nLen nLen; pBuffer-bIsReleased FALSE; } return pBuffer; } void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer) { if(pBuffer NULL || pBuffer-bIsReleased) return; ::EnterCriticalSection(m_FreeBufferListLock); if(m_nFreeBufferCount m_nMaxFreeBuffers) // 将要释放的内存添加到空闲列表中 { memset(pBuffer, 0, sizeof(CIOCPBuffer) /* BUFFER_SIZE*/); pBuffer-pNext m_pFreeBufferList; m_pFreeBufferList pBuffer; m_nFreeBufferCount ; pBuffer-bIsReleased TRUE; } else // 已经达到最大值真正的释放内存 { //::HeapFree(::GetProcessHeap(), 0, pBuffer); delete pBuffer; } ::LeaveCriticalSection(m_FreeBufferListLock); EnterCriticalSection(m_HeapLock); iBufferCount--; LeaveCriticalSection(m_HeapLock); } CIOCPContext *CIOCPServer::AllocateContext(SOCKET s) { CIOCPContext *pContext; // 申请一个CIOCPContext对象 ::EnterCriticalSection(m_FreeContextListLock); if(m_pFreeContextList NULL) { //pContext (CIOCPContext *)::HeapAlloc(::GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CIOCPContext)); pContext new CIOCPContext(); ::InitializeCriticalSection(pContext-Lock); } else { // 在空闲列表中申请 pContext m_pFreeContextList; m_pFreeContextList m_pFreeContextList-pNext; pContext-pNext NULL; m_nFreeBufferCount --; } ::LeaveCriticalSection(m_FreeContextListLock); EnterCriticalSection(m_HeapLock); iContextCount; LeaveCriticalSection(m_HeapLock); // 初始化对象成员 if(pContext ! NULL) { pContext-s s; pContext-bIsReleased FALSE; } return pContext; } void CIOCPServer::ReleaseContext(CIOCPContext *pContext) { if(pContext NULL || pContext-bIsReleased) return; printf(\n%s释放了Context\n\n,pContext-szClientName); if(pContext-s ! INVALID_SOCKET) ::closesocket(pContext-s); // 首先释放如果有的话此套节字上的没有按顺序完成的读I/O的缓冲区 CIOCPBuffer *pNext; while(pContext-pOutOfOrderReads ! NULL) { pNext pContext-pOutOfOrderReads-pNext; ReleaseBuffer(pContext-pOutOfOrderReads); pContext-pOutOfOrderReads pNext; } //xss,再释放如果有的话此套接字上未完成的写I/O缓冲区 CIOCPNextToSend* pSend NULL; while(pContext-pNextToSend ! NULL) { pSend pContext-pNextToSend-pNext; if(pContext-pNextToSend-pBuffer ! NULL pContext-pNextToSend-pBuffer-bIsReleased FALSE) { ReleaseBuffer(pContext-pNextToSend-pBuffer); } delete pContext-pNextToSend; pContext-pNextToSend pSend; } if(pContext-hTimer ! NULL) { DeleteTimerQueueTimer(m_hTimerQueue,pContext-hTimer,NULL); pContext-hTimer NULL; } ::EnterCriticalSection(m_FreeContextListLock); if(m_nFreeContextCount m_nMaxFreeContexts) // 添加到空闲列表 { // 先将关键代码段变量保存到一个临时变量中 CRITICAL_SECTION cstmp pContext-Lock; // 将要释放的上下文对象初始化为0 memset(pContext, 0, sizeof(CIOCPContext)); // 再放会关键代码段变量将要释放的上下文对象添加到空闲列表的表头 pContext-Lock cstmp; pContext-pNext m_pFreeContextList; m_pFreeContextList pContext; // 更新计数 m_nFreeContextCount ; pContext-bIsReleased TRUE; } else { ::DeleteCriticalSection(pContext-Lock); //::HeapFree(::GetProcessHeap(), 0, pContext); delete pContext; } ::LeaveCriticalSection(m_FreeContextListLock); EnterCriticalSection(m_HeapLock); iContextCount--; LeaveCriticalSection(m_HeapLock); } void CIOCPServer::FreeBuffers() { // 遍历m_pFreeBufferList空闲列表释放缓冲区池内存 ::EnterCriticalSection(m_FreeBufferListLock); CIOCPBuffer *pFreeBuffer m_pFreeBufferList; CIOCPBuffer *pNextBuffer; while(pFreeBuffer ! NULL) { pNextBuffer pFreeBuffer-pNext; delete pFreeBuffer; // if(!::HeapFree(::GetProcessHeap(), 0, pFreeBuffer)) // { // #ifdef _DEBUG // ::OutputDebugString( FreeBuffers释放内存出错); // #endif // _DEBUG // break; // } pFreeBuffer pNextBuffer; } m_pFreeBufferList NULL; m_nFreeBufferCount 0; ::LeaveCriticalSection(m_FreeBufferListLock); } void CIOCPServer::FreeContexts() { // 遍历m_pFreeContextList空闲列表释放缓冲区池内存 ::EnterCriticalSection(m_FreeContextListLock); CIOCPContext *pFreeContext m_pFreeContextList; CIOCPContext *pNextContext; while(pFreeContext ! NULL) { pNextContext pFreeContext-pNext; ::DeleteCriticalSection(pFreeContext-Lock); delete pFreeContext; // if(!::HeapFree(::GetProcessHeap(), 0, pFreeContext)) // { // #ifdef _DEBUG // ::OutputDebugString( FreeBuffers释放内存出错); // #endif // _DEBUG // break; // } pFreeContext pNextContext; } m_pFreeContextList NULL; m_nFreeContextCount 0; ::LeaveCriticalSection(m_FreeContextListLock); } BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext) { // 向客户连接列表添加一个CIOCPContext对象 ::EnterCriticalSection(m_ConnectionListLock); if(m_nCurrentConnection m_nMaxConnections) { // 添加到表头 pContext-pNext m_pConnectionList; m_pConnectionList pContext; // 更新计数 m_nCurrentConnection ; ::LeaveCriticalSection(m_ConnectionListLock); return TRUE; } ::LeaveCriticalSection(m_ConnectionListLock); return FALSE; } void CIOCPServer::CloseAConnection(CIOCPContext *pContext) { if(pContext NULL || pContext-bClosing TRUE) return; // 首先从列表中移除要关闭的连接 ::EnterCriticalSection(m_ConnectionListLock); CIOCPContext* pTest m_pConnectionList; if(pTest pContext) { m_pConnectionList pContext-pNext; m_nCurrentConnection --; } else { while(pTest ! NULL pTest-pNext ! pContext) pTest pTest-pNext; if(pTest ! NULL) { pTest-pNext pContext-pNext; m_nCurrentConnection --; } } ::LeaveCriticalSection(m_ConnectionListLock); // 然后关闭客户套节字 ::EnterCriticalSection(pContext-Lock); if(pContext-s ! INVALID_SOCKET) { ::closesocket(pContext-s); pContext-s INVALID_SOCKET; } pContext-bClosing TRUE; ::LeaveCriticalSection(pContext-Lock); } void CIOCPServer::CloseAllConnections() { // 遍历整个连接列表关闭所有的客户套节字 ::EnterCriticalSection(m_ConnectionListLock); CIOCPContext *pContext m_pConnectionList; while(pContext ! NULL) { ::EnterCriticalSection(pContext-Lock); if(pContext-s ! INVALID_SOCKET) { ::closesocket(pContext-s); pContext-s INVALID_SOCKET; } pContext-bClosing TRUE; ::LeaveCriticalSection(pContext-Lock); pContext pContext-pNext; } m_pConnectionList NULL; m_nCurrentConnection 0; ::LeaveCriticalSection(m_ConnectionListLock); } BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer) { // 将一个I/O缓冲区对象插入到m_pPendingAccepts表中 ::EnterCriticalSection(m_PendingAcceptsLock); if(m_pPendingAccepts NULL) m_pPendingAccepts pBuffer; else { pBuffer-pNext m_pPendingAccepts; m_pPendingAccepts pBuffer; } m_nPendingAcceptCount ; ::LeaveCriticalSection(m_PendingAcceptsLock); return TRUE; } BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer) { BOOL bResult FALSE; // 遍历m_pPendingAccepts表从中移除pBuffer所指向的缓冲区对象 ::EnterCriticalSection(m_PendingAcceptsLock); CIOCPBuffer *pTest m_pPendingAccepts; if(pTest pBuffer) // 如果是表头元素 { m_pPendingAccepts pBuffer-pNext; bResult TRUE; } else // 不是表头元素的话就要遍历这个表来查找了 { while(pTest ! NULL pTest-pNext ! pBuffer) pTest pTest-pNext; if(pTest ! NULL) { pTest-pNext pBuffer-pNext; bResult TRUE; } } // 更新计数 if(bResult) m_nPendingAcceptCount --; ::LeaveCriticalSection(m_PendingAcceptsLock); return bResult; } void CIOCPServer::ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer) { CloseAConnection(pContext); } BOOL CIOCPServer::PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss { ::EnterCriticalSection(pContext-Lock); CIOCPNextToSend *ptr pContext-pNextToSend; CIOCPNextToSend * pSend new CIOCPNextToSend(); pSend-pBuffer pBuffer; pSend-pNext NULL; if(ptr NULL) { printf(数据%10.10s ...被直接发送。\n,pBuffer-buff); //::EnterCriticalSection(pContext-Lock); pContext-pNextToSend pSend; //::LeaveCriticalSection(pContext-Lock); if(!PostSend(pContext,pBuffer))//如果没有需要等待的send就直接发送 { ::LeaveCriticalSection(pContext-Lock); return FALSE; } } else { printf(数据%10.10s ...被放入链表结尾。\n,pBuffer-buff); while(ptr-pNext ! NULL) { ptr ptr-pNext; } ptr-pNext pSend;//新的发送请求放在链表结尾 } ::LeaveCriticalSection(pContext-Lock); return TRUE; } BOOL CIOCPServer::PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss { ::EnterCriticalSection(pContext-Lock); CIOCPNextToSend* pSend pContext-pNextToSend; CIOCPNextToSend* pNextSend NULL; if(pSend ! NULL pSend-pNext ! NULL)//发送成功的pBuffer是队列的第一个发送下一个pNextToSend指向下一个pBuffer由外面释放。 { pNextSend pSend-pNext; if(pNextSend-pBuffer ! NULL) { printf(数据%10.10s ...从链表中弹出被发送。\n,pNextSend-pBuffer-buff); if(!PostSend(pContext,pNextSend-pBuffer)) { delete pSend; pContext-pNextToSend pNextSend; ::LeaveCriticalSection(pContext-Lock); return FALSE; } } } if(pSend ! NULL) { pNextSend pSend-pNext; delete pSend; pContext-pNextToSend pNextSend; } ::LeaveCriticalSection(pContext-Lock); return TRUE; } CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer) { if(pBuffer ! NULL) { // 如果与要读的下一个序列号相等则读这块缓冲区 if(pBuffer-nSequenceNumber pContext-nCurrentReadSequence) { return pBuffer; } // 如果不相等则说明没有按顺序接收数据将这块缓冲区保存到连接的pOutOfOrderReads列表中 // 列表中的缓冲区是按照其序列号从小到大的顺序排列的 pBuffer-pNext NULL; CIOCPBuffer *ptr pContext-pOutOfOrderReads; CIOCPBuffer *pPre NULL; while(ptr ! NULL) { if(pBuffer-nSequenceNumber ptr-nSequenceNumber) break; pPre ptr; ptr ptr-pNext; } if(pPre NULL) // 应该插入到表头 { pBuffer-pNext pContext-pOutOfOrderReads; pContext-pOutOfOrderReads pBuffer; } else // 应该插入到表的中间 { pBuffer-pNext pPre-pNext; pPre-pNext pBuffer/*-pNext*/;//xss,个人觉得应该是pPre-pNext pBuffer; } } // 检查表头元素的序列号如果与要读的序列号一致就将它从表中移除返回给用户 CIOCPBuffer *ptr pContext-pOutOfOrderReads; if(ptr ! NULL (ptr-nSequenceNumber pContext-nCurrentReadSequence)) { pContext-pOutOfOrderReads ptr-pNext; return ptr; } return NULL; } BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer) // 在监听套节字上投递Accept请求 { // 设置I/O类型 pBuffer-nOperation OP_ACCEPT; // 投递此重叠I/O DWORD dwBytes; pBuffer-sClient ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); BOOL b m_lpfnAcceptEx(m_sListen, pBuffer-sClient, pBuffer-buff, pBuffer-nLen - ((sizeof(sockaddr_in) 16) * 2),//xss,第一次都是收一个cmd_header sizeof(sockaddr_in) 16, sizeof(sockaddr_in) 16, dwBytes, pBuffer-ol); if(!b ::WSAGetLastError() ! WSA_IO_PENDING) { return FALSE; } if(pBuffer-nOperation 0) { int x 0; } return TRUE; }; BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer) { // 设置I/O类型 pBuffer-nOperation OP_READ; ::EnterCriticalSection(pContext-Lock); // 设置序列号 pBuffer-nSequenceNumber pContext-nReadSequence; // 投递此重叠I/O DWORD dwBytes; DWORD dwFlags 0; WSABUF buf; buf.buf pBuffer-buff; buf.len pBuffer-nLen; if(::WSARecv(pContext-s, buf, 1, dwBytes, dwFlags, pBuffer-ol, NULL) ! NO_ERROR) { if(::WSAGetLastError() ! WSA_IO_PENDING) { printf(WSARecv出错%d\n,WSAGetLastError()); ::LeaveCriticalSection(pContext-Lock); return FALSE; } } // 增加套节字上的重叠I/O计数和读序列号计数 pContext-nOutstandingRecv ; pContext-nReadSequence ; ::LeaveCriticalSection(pContext-Lock); return TRUE; } BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer) { // 跟踪投递的发送的数量防止用户仅发送数据而不接收导致服务器抛出大量发送操作 if(pContext-nOutstandingSend m_nMaxSends) return FALSE; // 设置I/O类型增加套节字上的重叠I/O计数 pBuffer-nOperation OP_WRITE; // 投递此重叠I/O DWORD dwBytes; DWORD dwFlags 0; WSABUF buf; buf.buf pBuffer-buff; buf.len pBuffer-nLen; if(::WSASend(pContext-s, buf, 1, dwBytes, dwFlags, pBuffer-ol, NULL) ! NO_ERROR) { int x; if((x::WSAGetLastError()) ! WSA_IO_PENDING) { printf(发送失败错误码%d,x); return FALSE; } } // 增加套节字上的重叠I/O计数 ::EnterCriticalSection(pContext-Lock); pContext-nOutstandingSend ; ::LeaveCriticalSection(pContext-Lock); if(pBuffer-nOperation 0) { int x 0; } return TRUE; } BOOL CIOCPServer::Start(int nPort, int nMaxConnections, int nMaxFreeBuffers, int nMaxFreeContexts, int nInitialReads) { // 检查服务是否已经启动 if(m_bServerStarted) return FALSE; // 保存用户参数 m_nPort nPort; m_nMaxConnections nMaxConnections; m_nMaxFreeBuffers nMaxFreeBuffers; m_nMaxFreeContexts nMaxFreeContexts; m_nInitialReads nInitialReads; // 初始化状态变量 m_bShutDown FALSE; m_bServerStarted TRUE; // 创建监听套节字绑定到本地端口进入监听模式 m_sListen ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); SOCKADDR_IN si; si.sin_family AF_INET; si.sin_port ::ntohs(m_nPort); si.sin_addr.S_un.S_addr INADDR_ANY; if(::bind(m_sListen, (sockaddr*)si, sizeof(si)) SOCKET_ERROR) { m_bServerStarted FALSE; return FALSE; } ::listen(m_sListen, 200); // 创建完成端口对象 m_hCompletion ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); // 加载扩展函数AcceptEx GUID GuidAcceptEx WSAID_ACCEPTEX; DWORD dwBytes; ::WSAIoctl(m_sListen, SIO_GET_EXTENSION_FUNCTION_POINTER, GuidAcceptEx, sizeof(GuidAcceptEx), m_lpfnAcceptEx, sizeof(m_lpfnAcceptEx), dwBytes, NULL, NULL); // 加载扩展函数GetAcceptExSockaddrs GUID GuidGetAcceptExSockaddrs WSAID_GETACCEPTEXSOCKADDRS; ::WSAIoctl(m_sListen, SIO_GET_EXTENSION_FUNCTION_POINTER, GuidGetAcceptExSockaddrs, sizeof(GuidGetAcceptExSockaddrs), m_lpfnGetAcceptExSockaddrs, sizeof(m_lpfnGetAcceptExSockaddrs), dwBytes, NULL, NULL ); // 将监听套节字关联到完成端口注意这里为它传递的CompletionKey为0 ::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0); // 注册FD_ACCEPT事件。 // 如果投递的AcceptEx I/O不够线程会接收到FD_ACCEPT网络事件说明应该投递更多的AcceptEx I/O WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT); // 创建监听线程 m_hListenThread ::CreateThread(NULL, 0, _ListenThreadProc, this, 0, NULL); return TRUE; } void CIOCPServer::Shutdown() { if(!m_bServerStarted) return; // 通知监听线程马上停止服务 m_bShutDown TRUE; ::SetEvent(m_hAcceptEvent); // 等待监听线程退出 ::WaitForSingleObject(m_hListenThread, INFINITE); ::CloseHandle(m_hListenThread); m_hListenThread NULL; m_bServerStarted FALSE; } DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam) { CIOCPServer *pThis (CIOCPServer*)lpParam; // 先在监听套节字上投递几个Accept I/O CIOCPBuffer *pBuffer; for(int i0; ipThis-m_nInitialAccepts; i) { pBuffer pThis-AllocateBuffer(BUFFER_SIZE);//xss,BUFFER_SIZE if(pBuffer NULL) return -1; pThis-InsertPendingAccept(pBuffer); pThis-PostAccept(pBuffer); } // 构建事件对象数组以便在上面调用WSAWaitForMultipleEvents函数 HANDLE hWaitEvents[2 MAX_THREAD]; int nEventCount 0; hWaitEvents[nEventCount ] pThis-m_hAcceptEvent; hWaitEvents[nEventCount ] pThis-m_hRepostEvent; // 创建指定数量的工作线程在完成端口上处理I/O for(int i0; iMAX_THREAD; i) { hWaitEvents[nEventCount ] ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL); } // 下面进入无限循环处理事件对象数组中的事件 while(TRUE) { int nIndex ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE, 60*1000, FALSE); // 首先检查是否要停止服务 if(pThis-m_bShutDown || nIndex WSA_WAIT_FAILED) { // 关闭所有连接 pThis-CloseAllConnections(); ::Sleep(0); // 给I/O工作线程一个执行的机会 // 关闭监听套节字 ::closesocket(pThis-m_sListen); pThis-m_sListen INVALID_SOCKET; ::Sleep(0); // 给I/O工作线程一个执行的机会 // 通知所有I/O处理线程退出 for(int i2; iMAX_THREAD 2; i) { ::PostQueuedCompletionStatus(pThis-m_hCompletion, -1, 0, NULL); } // 等待I/O处理线程退出 ::WaitForMultipleObjects(MAX_THREAD, hWaitEvents[2], TRUE, 5*1000); for(int i2; iMAX_THREAD 2; i) { ::CloseHandle(hWaitEvents[i]); } ::CloseHandle(pThis-m_hCompletion); pThis-FreeBuffers(); pThis-FreeContexts(); ::ExitThread(0); } // 1定时检查所有未返回的AcceptEx I/O的连接建立了多长时间 if(nIndex WSA_WAIT_TIMEOUT) { pBuffer pThis-m_pPendingAccepts; while(pBuffer ! NULL) { int nSeconds; int nLen sizeof(nSeconds); // 取得连接建立的时间 ::getsockopt(pBuffer-sClient, SOL_SOCKET, SO_CONNECT_TIME, (char *)nSeconds, nLen); // 如果超过2分钟客户还不发送初始数据就让这个客户go away if(nSeconds ! -1 nSeconds /*2*60*/50) { closesocket(pBuffer-sClient); pBuffer-sClient INVALID_SOCKET; } pBuffer pBuffer-pNext; } } else { nIndex nIndex - WAIT_OBJECT_0; WSANETWORKEVENTS ne; int nLimit0; if(nIndex 0) // 2m_hAcceptEvent事件对象受信说明投递的Accept请求不够需要增加 { ::WSAEnumNetworkEvents(pThis-m_sListen, hWaitEvents[nIndex], ne); if(ne.lNetworkEvents FD_ACCEPT) { nLimit 50; // 增加的个数这里设为50个 } } else if(nIndex 1) // 3m_hRepostEvent事件对象受信说明处理I/O的线程接受到新的客户 { nLimit InterlockedExchange(pThis-m_nRepostCount, 0); } else if(nIndex 1) // I/O服务线程退出说明有错误发生关闭服务器 { pThis-m_bShutDown TRUE; continue; } // 投递nLimit个AcceptEx I/O请求 int i 0; while(i nLimit pThis-m_nPendingAcceptCount pThis-m_nMaxAccepts) { pBuffer pThis-AllocateBuffer(BUFFER_SIZE); if(pBuffer ! NULL) { pThis-InsertPendingAccept(pBuffer); pThis-PostAccept(pBuffer); } } } } return 0; } DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam) { #ifdef _DEBUG ::OutputDebugString( WorkerThread 启动... \n); #endif // _DEBUG CIOCPServer *pThis (CIOCPServer*)lpParam; CIOCPBuffer *pBuffer NULL; DWORD dwKey; DWORD dwTrans; LPOVERLAPPED lpol; while(TRUE) { // 在关联到此完成端口的所有套节字上等待I/O完成 BOOL bOK ::GetQueuedCompletionStatus(pThis-m_hCompletion, dwTrans, (LPDWORD)dwKey, (LPOVERLAPPED*)lpol, WSA_INFINITE); if(dwTrans -1) // 用户通知退出 { #ifdef _DEBUG ::OutputDebugString( WorkerThread 退出 \n); #endif // _DEBUG ::ExitThread(0); } if(dwTrans ! -2) pBuffer CONTAINING_RECORD(lpol, CIOCPBuffer, ol); int nError NO_ERROR; if(!bOK) // 在此套节字上有错误发生 { printf(完成端口套接字上有错误%d\n,GetLastError()); SOCKET s; if(pBuffer-nOperation OP_ACCEPT) { s pThis-m_sListen; } else { if(dwKey 0) break; s ((CIOCPContext*)dwKey)-s; } DWORD dwFlags 0; if(!::WSAGetOverlappedResult(s, pBuffer-ol, dwTrans, FALSE, dwFlags)) { nError ::WSAGetLastError(); } } pThis-HandleIO(dwKey, pBuffer, dwTrans, nError); printf(Buffer:%d Context:%d\n,iBufferCount,iContextCount); } #ifdef _DEBUG ::OutputDebugString( WorkerThread 退出 \n); #endif // _DEBUG return 0; } int g_x 0; void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError) { CIOCPContext *pContext (CIOCPContext *)dwKey; #ifdef _DEBUG ::OutputDebugString( HandleIO... \n); #endif // _DEBUG // 1首先减少套节字上的未决I/O计数 if(dwTrans -2) { CloseAConnection(pContext); return; } if(pContext ! NULL) { ::EnterCriticalSection(pContext-Lock); if(pBuffer-nOperation OP_READ) pContext-nOutstandingRecv --; else if(pBuffer-nOperation OP_WRITE) pContext-nOutstandingSend --; ::LeaveCriticalSection(pContext-Lock); // 2检查套节字是否已经被我们关闭 if(pContext-bClosing) { #ifdef _DEBUG ::OutputDebugString( 检查到套节字已经被我们关闭 \n); #endif // _DEBUG if(pContext-nOutstandingRecv 0 pContext-nOutstandingSend 0) { ReleaseContext(pContext); pContext NULL; } // 释放已关闭套节字的未决I/O ReleaseBuffer(pBuffer); pBuffer NULL; return; } } else { RemovePendingAccept(pBuffer); } // 3检查套节字上发生的错误如果有的话通知用户然后关闭套节字 if(nError ! NO_ERROR) { if(pBuffer-nOperation ! OP_ACCEPT) { OnConnectionError(pContext, pBuffer, nError); CloseAConnection(pContext); if(pContext-nOutstandingRecv 0 pContext-nOutstandingSend 0) { ReleaseContext(pContext); pContext NULL; } #ifdef _DEBUG ::OutputDebugString( 检查到客户套节字上发生错误 \n); #endif // _DEBUG } else // 在监听套节字上发生错误也就是监听套节字处理的客户出错了 { // 客户端出错释放I/O缓冲区 if(pBuffer-sClient ! INVALID_SOCKET) { ::closesocket(pBuffer-sClient); pBuffer-sClient INVALID_SOCKET; } #ifdef _DEBUG ::OutputDebugString( 检查到监听套节字上发生错误 \n); #endif // _DEBUG } ReleaseBuffer(pBuffer); pBuffer NULL; return; } // 开始处理 if(pBuffer-nOperation OP_ACCEPT) { if(dwTrans 0) { #ifdef _DEBUG ::OutputDebugString( 监听套节字上客户端关闭 \n); #endif // _DEBUG if(pBuffer-sClient ! INVALID_SOCKET) { ::closesocket(pBuffer-sClient); pBuffer-sClient INVALID_SOCKET; } } else { // 为新接受的连接申请客户上下文对象 CIOCPContext *pClient AllocateContext(pBuffer-sClient); if(pClient ! NULL) { if(AddAConnection(pClient)) { // 取得客户地址 int nLocalLen, nRmoteLen; LPSOCKADDR pLocalAddr, pRemoteAddr; m_lpfnGetAcceptExSockaddrs( pBuffer-buff, pBuffer-nLen - (sizeof(sockaddr_in) 16) * 2/*sizeof(cmd_header)*/, sizeof(sockaddr_in) 16, sizeof(sockaddr_in) 16, (SOCKADDR **)pLocalAddr, nLocalLen, (SOCKADDR **)pRemoteAddr, nRmoteLen); memcpy(pClient-addrLocal, pLocalAddr, nLocalLen); memcpy(pClient-addrRemote, pRemoteAddr, nRmoteLen); // 关联新连接到完成端口对象 ::CreateIoCompletionPort((HANDLE)pClient-s, m_hCompletion, (DWORD)pClient, 0); // 通知用户 pBuffer-nLen dwTrans; OnConnectionEstablished(pClient, pBuffer); if(pClient-bClosing pClient-nOutstandingRecv 0 pClient-nOutstandingSend 0) { ReleaseContext(pClient); pContext NULL; } else if(pClient-hTimer NULL)//接收一个客户端的同时创建一个检测I/O超时的Timer { pClient-hCompletion m_hCompletion; CreateTimerQueueTimer(pClient-hTimer,m_hTimerQueue,(WAITORTIMERCALLBACK)TimerRoutine,(PVOID)pClient,60*1000,0,0); } // 向新连接投递Read请求或者Write请求直接关闭这些空间在套节字关闭或出错时释放 // CIOCPBuffer *p AllocateBuffer(BUFFER_SIZE); // if(p ! NULL) // { // if(!PostRecv(pClient, p)) // { // CloseAConnection(pClient); // } // } } else // 连接数量已满关闭连接 { CloseAConnection(pClient); ReleaseContext(pClient); pContext NULL; } } else { // 资源不足关闭与客户的连接即可 ::closesocket(pBuffer-sClient); pBuffer-sClient INVALID_SOCKET; } } // Accept请求完成释放I/O缓冲区 ReleaseBuffer(pBuffer); pBuffer NULL; // 通知监听线程继续再投递一个Accept请求 ::InterlockedIncrement(m_nRepostCount); ::SetEvent(m_hRepostEvent); } else if(pBuffer-nOperation OP_READ) { if(dwTrans 0) // 对方关闭套节字 { // 先通知用户 pBuffer-nLen 0; OnConnectionClosing(pContext, pBuffer); // 再关闭连接 CloseAConnection(pContext); // 释放客户上下文和缓冲区对象 if(pContext-nOutstandingRecv 0 pContext-nOutstandingSend 0) { ReleaseContext(pContext); pContext NULL; } ReleaseBuffer(pBuffer); pBuffer NULL; } else { pBuffer-nLen dwTrans; // 按照I/O投递的顺序读取接收到的数据 CIOCPBuffer *p GetNextReadBuffer(pContext, pBuffer); while(p ! NULL) { // 通知用户 OnReadCompleted(pContext, p); // 增加要读的序列号的值 ::InterlockedIncrement((LONG*)pContext-nCurrentReadSequence); // 释放这个已完成的I/O ReleaseBuffer(p); p GetNextReadBuffer(pContext, NULL); } if(pContext-bClosing pContext-nOutstandingRecv 0 pContext-nOutstandingSend 0) { ReleaseContext(pContext); pContext NULL; } else if(pContext-hTimer ! NULL) { ChangeTimerQueueTimer(m_hTimerQueue,pContext-hTimer,60*1000,0);//重置监视时间当一个投递完成后60s内无任何交互则断开。 } // 继续投递一个新的接收请求 // pBuffer AllocateBuffer(BUFFER_SIZE); //if(pBuffer NULL || !PostRecv(pContext, pBuffer)) //{ // CloseAConnection(pContext); //} } } else if(pBuffer-nOperation OP_WRITE) { if(dwTrans 0) // 对方关闭套节字 { // 先通知用户 pBuffer-nLen 0; OnConnectionClosing(pContext, pBuffer); // 再关闭连接 CloseAConnection(pContext); // 释放客户上下文和缓冲区对象 if(pContext-nOutstandingRecv 0 pContext-nOutstandingSend 0) { ReleaseContext(pContext); pContext NULL; } ReleaseBuffer(pBuffer); pBuffer NULL; } else { if(pContext-bClosing pContext-nOutstandingRecv 0 pContext-nOutstandingSend 0) { ReleaseContext(pContext); pContext NULL; ReleaseBuffer(pBuffer); pBuffer NULL; return; } else if(pContext-hTimer ! NULL) { ChangeTimerQueueTimer(m_hTimerQueue,pContext-hTimer,60*1000,0); } // 写操作完成通知用户 if(dwTrans pBuffer-nLen)//如果此send没有发送完全则发送剩下的部分此部分如果还是没发完全这里同样进行 { printf(send未发送完全发送%d总长度%d\n,dwTrans,pBuffer-nLen); CIOCPBuffer* p AllocateBuffer(pBuffer-nLen - dwTrans); if(p ! NULL) memcpy(p-buff,pBuffer-buff dwTrans,pBuffer-nLen - dwTrans); if(p NULL || !PostSend(pContext,p)) { CloseAConnection(pContext); return; } } else { if(!PostNextWriteBuffer(pContext,pBuffer)) { CloseAConnection(pContext); return; } } pBuffer-nLen dwTrans; OnWriteCompleted(pContext, pBuffer); if(pContext-bClosing pContext-nOutstandingRecv 0 pContext-nOutstandingSend 0) { ReleaseContext(pContext); pContext NULL; } // 释放SendText函数申请的缓冲区 ReleaseBuffer(pBuffer); pBuffer NULL; } } } BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen) { CIOCPBuffer *pBuffer AllocateBuffer(nLen); if(pBuffer ! NULL) { memcpy(pBuffer-buff, pszText, nLen); return PostSend(pContext, pBuffer); } return FALSE; } //投递接收请求示例 //CIOCPBuffer *p AllocateBuffer(BUFFER_SIZE); //if(p ! NULL) //{ // if(!PostRecv(pContext, p)) // { // CloseAConnection(pContext); // } //} //投递发送请求示例 //CIOCPBuffer *p AllocateBuffer(BUFFER_SIZE); //if(p ! NULL) //{ // if(!PostSendToList(pContext, p)) // { // CloseAConnection(pContext); // } //} void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer) { //连接建立且第一次数据接收完成。 //接下来可以根据业务逻辑PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接 } void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer) { } void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer) { //一次数据接收完成。 //接下来可以根据业务逻辑PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接 } void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer) { //一次数据发送完成。 //接下来可以根据业务逻辑PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接 } void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError) { }