Текущий архив: 2006.04.09;
Скачать: CL | DM;
ВнизCreateNamedPipe Найти похожие ветки
← →
serguar (2006-01-19 09:54) [0]Здравствуйте! Возникла следующая проблема:
пытаюсь организовать межпроцессорное взаимодействие между сервисом и GUI приложениями через именованные каналы.
При попытке создать именованный канал возникает ошибка:
Error 52 "Не удалось подключиться к сети из-за существования совпадающих имен. Измените имя компьютера на панели управления и повторите попытку"
Мой исходный код:
const
lpszPipeName = "\\\\.\\pipe\\MyPipe";
var
hNamedPipe: THandle;
szBuf: array[0..512] of Char;
cbRead: DWORD;
cbWritten: DWORD;
Err: Integer;
begin
Writeln("Named pipe server demo");
hNamedPipe := CreateNamedPipe(PChar(lpszPipeName), PIPE_ACCESS_DUPLEX,
PIPE_TYPE_MESSAGE or PIPE_READMODE_MESSAGE or PIPE_WAIT, PIPE_UNLIMITED_INSTANCES,
512, 512, 5000, nil);
if hNamedPipe = INVALID_HANDLE_VALUE then begin
Writeln("CreateNamedPipe: Error " + SysErrorMessage(GetLastError));
Readln;
Exit;
end;
...
далее код работы с каналом
В чем проблема? Заранее благодарен.
← →
Digitman © (2006-01-19 10:14) [1]lpszPipeName = "\\.\pipe\MyPipe"; //!!!!!
← →
Digitman © (2006-01-19 10:16) [2]пример использования прогр.каналов :
unit IPCThread;
interface
uses
Windows, Messages, SysUtils, Classes;
const
sIPCPipeName = "IPCWinEtka";
MAX_MSG_SIZE = 1024;
DEFAULT_TIMEOUT = 5000;
TM_CONNECT = WM_USER + 1000;
TM_SENDMSG = WM_USER + 1001;
type
ETransportError = class(Exception); //исключение по отказу транспорта (например, соединение разорвано)
EActionError = class(Exception); //исключение в ходе исполнения акции
TSignature = (CallSig, ResultSig); //сигнатура транспортного пакета
TAction = Integer; //код акции
//заголовок информациионного пакета
PDataBlockHeader = ^TDataBlockHeader;
TDataBlockHeader = packed record
Signature: TSignature; //сингатура
Action: TAction; //код акции
ExceptionFlag: Boolean; //флаг исключения для пакета с сигнатурой результата
WaitEvent: DWord; //событие ожидания результата инф.транзакции
ParamSize: DWord; //кол-во байт параметра акции
end;
//информационный пакет
PDataBlock = ^TDataBlock;
TDataBlock = packed record
Header: TDataBlockHeader; //заголовок
Params: array[0..MAX_MSG_SIZE - SizeOf(TDataBlockHeader) - 1] of Byte; //параметры
end;
const
DataBlockHdrSize = SizeOf(TDataBlockHeader);
type
//прототип ф-ции обработки вызванной акции
TActionHandler = function(Action: TAction; Params: Pointer; ParamSize: DWord; out ResultData; out ResultSize: DWord): Boolean;
//базовый класс транспортного трэда
TIPCThread = class(TThread)
private
FPipeName: String;
hPipe: THandle;
hWnd: THandle;
ovr, ovw: TOverlapped;
FEvents: array[0..1] of THandle;
FConnected, FPendingConnect, FPendingRead, FPendingWrite: Boolean;
FInBuf, FOutBuf: PDataBlock; //буферы приема/передачи
FBytesRead, FBytesWritten: DWord;
FActionHandler: TActionHandler;
FSendQueue: TThreadList; //очередь передаваемых инф.пакетов
FPendingQueue: TList; //список объектов-событий ожидания результатов акций
FRecvQueue: TThreadList; //очередь принятых инф.пакетов
procedure ClearSendingQueue;
procedure ClearReceivingQueue;
procedure ClearPendingQueue;
procedure ProcessReadEvent;
procedure ProcessWriteEvent;
procedure ProcessMessages;
procedure ProcessTransport;
procedure ProcessIncomingCall;
procedure ProcessIncomingResult;
procedure MsgQuit(var Message: TMessage); message WM_QUIT;
procedure MsgSendMsg(var Message: TMessage); message TM_SENDMSG;
protected
procedure DoTerminate; override;
procedure DoConnect; virtual;
procedure DoDisconnect; virtual;
procedure Execute; override;
public
constructor Create(WndHandle: THandle; PipeName: String; ActionHandler: TActionHandler);
destructor Destroy; override;
//после УСПЕШНОГО исполнения акции во избежание утечек памяти
//следует вызвать FreeMem(ResultData), если ResultSize > 0
function CallAction(Action: TAction; Params: Pointer; ParamSize: DWord; out ResultData: Pointer; out ResultSize: DWord): Boolean;
property Connected: Boolean read FConnected;
end;
//класс транспортного трэда клиента
TIPCClientThread = class(TIPCThread)
private
FServerName: String;
protected
procedure DoConnect; override;
procedure DoDisconnect; override;
public
constructor Create(WndHandle: THandle; ServerName, PipeName: String; ActionHandler: TActionHandler);
end;
//класс транспортного трэда сервера
TIPCServerThread = class(TIPCThread)
private
protected
procedure DoConnect; override;
procedure DoDisconnect; override;
public
end;
см. продолжение ..
← →
Digitman © (2006-01-19 10:17) [3]
implementation
{ TIPCThread }
constructor TIPCThread.Create(WndHandle: THandle; PipeName: String; ActionHandler: TActionHandler);
begin
hWnd := WndHandle;
FPipeName := PipeName;
FActionHandler := ActionHandler;
FillChar(ovr, sizeof(ovr), 0);
FillChar(ovw, sizeof(ovw), 0);
ovr.hEvent := CreateEvent(nil, False, False, nil);
ovw.hEvent := CreateEvent(nil, False, False, nil);
FEvents[0] := ovr.hEvent;
FEvents[1] := ovw.hEvent;
GetMem(FInBuf, MAX_MSG_SIZE);
GetMem(FOutBuf, MAX_MSG_SIZE);
FSendQueue := TThreadList.Create;
FRecvQueue := TThreadList.Create;
FPendingQueue := TList.Create;
inherited Create(False);
end;
destructor TIPCThread.Destroy;
begin
PostThreadMessage(ThreadId, WM_QUIT, 0, 0);
inherited;
if hPipe <> 0 then
CloseHandle(hPipe);
FRecvQueue.Free;
FSendQueue.Free;
FPendingQueue.Free;
CloseHandle(ovr.hEvent);
CloseHandle(ovw.hEvent);
CloseHandle(hPipe);
FreeMem(FInBuf);
FreeMem(FOutBuf);
end;
procedure TIPCThread.Execute;
var
Msg: TMsg;
begin
PeekMessage(Msg, 0, 0, 0, PM_NOREMOVE);
try
//пока трансп.трэд не терминирован
while not Terminated do
begin
DoConnect; //соединение с удаленным партнером
if Terminated then Exit;
//пока транспорт активен и трэд не терминирован
while not Terminated and FConnected do
ProcessTransport; //обработка транспортных событий
end;
finally
DoDisconnect;
end;
end;
//обработка сообщений трансп.трэду
procedure TIPCThread.ProcessMessages;
var
Msg: TMsg;
begin
while PeekMessage(Msg, 0, 0, 0, PM_REMOVE) do
Dispatch(Msg.Message);
end;
//очистка списка объектов-событий ожидания рез-тов вызванных акций
procedure TIPCThread.ClearPendingQueue;
var
i: Integer;
begin
with FPendingQueue do
begin
for i := 0 to Count - 1 do
CloseHandle(THandle(Items[i]));
Clear;
end;
end;
//очистка очереди передачи
procedure TIPCThread.ClearSendingQueue;
var
Msg: TMsg;
DataBlock: PDataBlock;
List: TList;
i: Integer;
begin
while PeekMessage(Msg, 0, TM_SENDMSG, TM_SENDMSG, PM_REMOVE) do
begin
DataBlock := PDataBlock(Msg.wParam);
with DataBlock.Header do
if (Signature = CallSig) and (WaitEvent <> 0) then
CloseHandle(WaitEvent);
FreeMem(DataBlock);
end;
List := FSendQueue.LockList;
with List do
try
for i:= 0 to Count - 1 do
begin
DataBlock := Items[i];
with DataBlock.Header do
if (Signature = CallSig) and (WaitEvent <> 0) then
CloseHandle(WaitEvent);
FreeMem(DataBlock);
end;
Clear;
finally
FSendQueue.UnLockList;
end;
end;
//очистка очереди приема
procedure TIPCThread.ClearReceivingQueue;
var
List: TList;
i: Integer;
DataBlock: PDataBlock;
begin
List := FRecvQueue.LockList;
with List do
try
for i:= 0 to Count - 1 do
begin
DataBlock := Items[i];
if DataBlock.Header.Signature = ResultSig then
CloseHandle(DataBlock.Header.WaitEvent);
FreeMem(DataBlock);
end;
Clear;
finally
FRecvQueue.UnLockList;
end;
end;
← →
Digitman © (2006-01-19 10:17) [4]
//обработка трансп.событий
procedure TIPCThread.ProcessTransport;
var
Msg: TMsg;
DataBlock: PDataBlock;
Size: DWord;
begin
if not FPendingRead then
begin
FPendingRead := not ReadFile(hPipe, FInBuf^, MAX_MSG_SIZE, FBytesRead, @ovr) and (GetLastError = ERROR_IO_PENDING);
if not FPendingRead then
DoDisconnect
end
else
case MsgWaitForMultipleObjects(2, FEvents, False, INFINITE, QS_SENDMESSAGE or QS_POSTMESSAGE) of
WAIT_OBJECT_0 : ProcessReadEvent;
WAIT_OBJECT_0 + 1 : ProcessWriteEvent;
WAIT_OBJECT_0 + 2 : ProcessMessages;
end;
end;
//событие приема данных
procedure TIPCThread.ProcessReadEvent;
var
ovl_Result: Boolean;
begin
FPendingRead := False;
ovl_Result := GetOverlappedResult(hPipe, ovr, FBytesRead, False);
if ovl_Result and (FBytesRead = 0) then Exit;
if not ovl_Result or (FBytesRead <> (DataBlockHdrSize + FInBuf.Header.ParamSize)) then
DoDisconnect
else
case FInBuf.Header.Signature of
CallSig: ProcessIncomingCall;
ResultSig: ProcessIncomingResult;
else
DoDisconnect;
end;
end;
//событие передачи данных
procedure TIPCThread.ProcessWriteEvent;
var
DataBlock: PDataBlock;
BlockSize: DWord;
List: TList;
ovl_Result: Boolean;
begin
FPendingWrite := False;
ovl_Result := GetOverlappedResult(hPipe, ovw, FBytesWritten, False);
if ovl_Result and (FBytesWritten = 0) then Exit;
if not ovl_Result or (FBytesWritten <> (DataBlockHdrSize + FOutBuf.Header.ParamSize)) then
DoDisconnect
else
begin
List := FSendQueue.LockList;
with List do
try
if Count > 0 then
begin
DataBlock := Items[0];
Delete(0);
end
else
DataBlock := nil;
finally
FSendQueue.UnLockList;
end;
if Assigned(DataBlock) then
begin
BlockSize := DataBlockHdrSize + DataBlock.Header.ParamSize;
CopyMemory(FOutBuf, DataBlock, BlockSize);
FreeMem(DataBlock);
if WriteFile(hPipe, FOutBuf^, BlockSize, FBytesWritten, @ovw) then Exit;
FPendingWrite := GetLastError = ERROR_IO_PENDING;
if not FPendingWrite then
with FOutBuf.Header do
begin
if (Signature = CallSig) and (WaitEvent <> 0) then
CloseHandle(WaitEvent);
DoDisconnect;
end;
end;
end;
end;
//обработка входящих запросов на исполнение акции
procedure TIPCThread.ProcessIncomingCall;
const
ActionNotSupported: PChar = "Акция не поддерживается удаленной стороной";
var
ResultDataBlock: PDataBlock;
ResultDataBlockSize: DWord;
excptmsg: String;
begin
with FInBuf^ do
try
Header.ExceptionFlag := True;
if not Assigned(FActionHandler)
or not FActionHandler(Header.Action, @Params, Header.ParamSize, Params, Header.ParamSize) then
begin
StrCopy(@Params, ActionNotSupported);
Header.ParamSize := StrLen(ActionNotSupported) + 1;
end
else
Header.ExceptionFlag := False;
except
on e:Exception do
begin
excptmsg := "Исполнение акции вызвало исключительную ситуацию"#10 + e.ClassName + #10 + e.Message;
StrCopy(@Params, PChar(excptmsg));
Header.ParamSize := Length(excptmsg) + 1;
end;
end;
ResultDataBlockSize := DataBlockHdrSize + FInBuf.Header.ParamSize;
GetMem(ResultDataBlock, ResultDataBlockSize);
with ResultDataBlock^ do
begin
Header.Action := FInBuf.Header.Action;
Header.WaitEvent := FInBuf.Header.WaitEvent;
Header.Signature := ResultSig;
Header.ExceptionFlag := FInBuf.Header.ExceptionFlag;
Header.ParamSize := FInBuf.Header.ParamSize;
if Header.ParamSize > 0 then
CopyMemory(@Params, @FInBuf.Params, FInBuf.Header.ParamSize);
PostThreadMessage(ThreadId, TM_SENDMSG, Cardinal(ResultDataBlock), ResultDataBlockSize);
end;
end;
← →
Digitman © (2006-01-19 10:18) [5]
//обработка входящих результатов исполнения акции
procedure TIPCThread.ProcessIncomingResult;
var
List: TList;
DataBlock: PDataBlock;
BlockSize: DWord;
idx: Integer;
begin
BlockSize := DataBlockHdrSize + FInBuf.Header.ParamSize;
List := FRecvQueue.LockList;
with List do
try
GetMem(DataBlock, BlockSize);
CopyMemory(DataBlock, FInBuf, BlockSize);
idx := FPendingQueue.IndexOf(Pointer(DataBlock.Header.WaitEvent));
if idx >= 0 then
begin
FPendingQueue.Delete(idx);
if SetEvent(DataBlock.Header.WaitEvent) then
Add(DataBlock)
else
FreeMem(DataBlock);
end
else
FreeMem(DataBlock);
finally
FRecvQueue.UnLockList;
end;
end;
//обработка сообщений на постановку инф.блока в очередь на передачу
procedure TIPCThread.MsgSendMsg(var Message: TMessage);
var
List: TList;
DataBlock: PDataBlock;
BlockSize: DWord;
idx: Integer;
begin
DataBlock := PDataBlock(Message.wParam);
if FPendingWrite then
begin
List := FSendQueue.LockList;
try
List.Add(DataBlock);
finally
FSendQueue.UnLockList;
end;
end
else
begin
BlockSize := Message.lParam;
CopyMemory(FOutBuf, DataBlock, BlockSize);
FreeMem(DataBlock);
with FOutBuf.Header do
begin
if (Signature = CallSig) and (WaitEvent <> 0) then
idx := FPendingQueue.Add(Pointer(WaitEvent));
end;
if WriteFile(hPipe, FOutBuf^, BlockSize, FBytesWritten, @ovw) then
Exit;
FPendingWrite := GetLastError = ERROR_IO_PENDING;
if not FPendingWrite then
DoDisconnect;
end;
end;
//команда на завершение трэда
procedure TIPCThread.MsgQuit(var Message: TMessage);
begin
Terminate;
end;
//вызов акции на исполнение удаленной стороной
//Вх.аргументы :
// Action - код акции
// Params - параметры акции
// ParamSize - число байт параметров акции
//Вых.аргументы:
// ResultData - данные, предст. результаты исполненной акции
// ResultSize - число байт результата
//ф-ция возвращает True, если транзакция успешно завершена, False - иначе
//ф-ция так же возбуждает исключения ETransportError и EActionError
function TIPCThread.CallAction(Action: TAction; Params: Pointer;
ParamSize: DWord; out ResultData: Pointer; out ResultSize: DWord): Boolean;
var
DataBlock: PDataBlock;
BlockSize: DWord;
hEvent: THandle;
List: TList;
i: Integer;
begin
Result := False;
if not FConnected then Exit;
BlockSize := DataBlockHdrSize + ParamSize;
GetMem(DataBlock, BlockSize);
hEvent := CreateEvent(nil, False, False, nil);
with DataBlock^ do
begin
Header.Signature := CallSig;
Header.Action := Action;
Header.WaitEvent := hEvent;
Header.ParamSize := ParamSize;
end;
CopyMemory(@DataBlock.Params, Params, ParamSize);
if not PostThreadMessage(ThreadId, TM_SENDMSG, Cardinal(DataBlock), BlockSize) then
begin
CloseHandle(hEvent);
FreeMem(DataBlock);
raise ETransportError.Create("Соединение разорвано");
end;
DataBlock := nil;
while True do
case WaitForSingleObject(hEvent, 10) of
WAIT_OBJECT_0:
begin
List := FRecvQueue.LockList;
with List do
try
for i := 0 to Count - 1 do
if PDataBlock(Items[i]).Header.WaitEvent = hEvent then
begin
DataBlock := Items[i];
CloseHandle(DataBlock.Header.WaitEvent);
Delete(i);
Break;
end;
Break;
finally
FRecvQueue.UnLockList;
end;
end;
WAIT_FAILED: raise ETransportError.Create("Соединение разорвано");
WAIT_TIMEOUT: Continue;
end;
if Assigned(DataBlock) then
try
if not DataBlock.Header.ExceptionFlag then
begin
ResultSize := DataBlock.Header.ParamSize;
GetMem(ResultData, ResultSize);
CopyMemory(ResultData, @DataBlock.Params, ResultSize);
Result := True;
end
else
raise EActionError.Create(StrPas(@DataBlock.Params));
finally
FreeMem(DataBlock);
end;
end;
procedure TIPCThread.DoConnect;
begin
if hWnd <> 0 then
PostMessage(hWnd, TM_CONNECT, 0, 0);
end;
//разрыв трансп.соединения
procedure TIPCThread.DoDisconnect;
begin
if not FConnected then Exit;
if FPendingConnect or FPendingRead or FPendingWrite then
CancelIO(hPipe);
ClearSendingQueue;
ClearReceivingQueue;
ClearPendingQueue;
FConnected := False;
end;
procedure TIPCThread.DoTerminate;
begin
end;
{ TIPCClientThread }
constructor TIPCClientThread.Create(WndHandle: THandle; ServerName, PipeName: String; ActionHandler: TActionHandler);
begin
FServerName := Trim(ServerName);
if FServerName = "" then
FServerName := ".";
inherited Create(WndHandle, PipeName, ActionHandler);
end;
procedure TIPCClientThread.DoConnect;
var
FullPipeName: String;
begin
FullPipeName := "\\" + FServerName + "\pipe\" + FPipeName;
if WaitNamedPipe(PChar(FullPipeName), DEFAULT_TIMEOUT) then
begin
hPipe := CreateFile(PChar(FullPipeName),
GENERIC_READ or GENERIC_WRITE, 0, nil,
OPEN_EXISTING,
FILE_FLAG_NO_BUFFERING
or FILE_FLAG_WRITE_THROUGH
or FILE_FLAG_OVERLAPPED,
0);
FConnected := hPipe <> 0;
end;
if not FConnected and (GetLastError = ERROR_FILE_NOT_FOUND) then
Sleep(100)
else
inherited;
end;
procedure TIPCClientThread.DoDisconnect;
begin
inherited;
CloseHandle(hPipe);
hPipe := 0;
end;
{ TIPCServerThread }
procedure TIPCServerThread.DoConnect;
var
ovc: TOverlapped;
WaitResult: DWord;
begin
if hPipe = 0 then
begin
hPipe := CreateNamedPipe(PChar("\\.\pipe\" + FPipeName),
PIPE_ACCESS_DUPLEX
or FILE_FLAG_WRITE_THROUGH
or FILE_FLAG_OVERLAPPED,
PIPE_TYPE_MESSAGE or PIPE_READMODE_MESSAGE or PIPE_WAIT,
1, MAX_MSG_SIZE, MAX_MSG_SIZE, DEFAULT_TIMEOUT, nil);
if hPipe = 0 then Exit;
end;
FillChar(ovc, SizeOf(ovc), 0);
ovc.hEvent := CreateEvent(nil, False, False, nil);
ConnectNamedPipe(hPipe, @ovc);
FPendingConnect := GetLastError = ERROR_IO_PENDING;
while not Terminated and FPendingConnect do
begin
WaitResult := MsgWaitForMultipleObjects(1, ovc.hEvent, False, INFINITE, QS_ALLINPUT);
case WaitResult of
WAIT_OBJECT_0:
begin
FPendingConnect := False;
FConnected := GetOverlappedResult(hPipe, ovc, FBytesRead, False);
if FConnected then
inherited;
end;
WAIT_OBJECT_0 + 1: ProcessMessages;
end;
end;
CloseHandle(ovc.hEvent);
end;
procedure TIPCServerThread.DoDisconnect;
begin
inherited;
DisconnectNamedPipe(hPipe);
end;
end.
← →
serguar (2006-01-19 10:50) [6]Большое спасибо.
У меня все заработало. Только не понятно, у Рихтера я прочитал что имя канала как у меня в первом варианте. Тогда в чем разница? И еще у Рихтера работает даже если клиент соединяется с сетевой машины!!! Круто - никаких тебе сокетов не надо. А у меня по сети почему то не работает. Только на локальном узле.
← →
serguar (2006-01-19 10:51) [7]Еще раз спасибо за пример.
Это теперь что получается: мне самому и думать не надо? Все уже готово? Буду пробовать.
← →
Игорь Шевченко © (2006-01-19 10:51) [8]
> Только не понятно, у Рихтера я прочитал что имя канала как
> у меня в первом варианте
В языке С символ \ кодируется как \\ - соглашение такое
← →
serguar (2006-01-19 10:59) [9]> Игорь Шевченко
> В языке С символ \ кодируется как \\ - соглашение такое
Спасибо. Я это запомню.
← →
Digitman © (2006-01-19 11:01) [10]
> Это теперь что получается: мне самому и думать не надо?
Думать надо ВСЕГДА.
Примеры даются не для голого и бездумного их передирания и использования, а для анализа в них реализованой логики.
> Все уже готово?
В принципе - да.
Но учти что в дан.случае есть ограничение на размер транспортного пакета (введено для упрощения реализации трансп.уровня) :
MAX_MSG_SIZE = 1024;
← →
serguar (2006-01-19 11:13) [11]
> Но учти что в дан.случае есть ограничение на размер транспортного
> пакета (введено для упрощения реализации трансп.уровня)
> :
А чем обусловлено ограничение? Только константой? Я так понимаю работа с каналом ведется как обычным файлом. Соответственно и ограничение на размер пакета можно принять равным максимальному размеру файла принятого в ОС. Мне конечно такой большой не надо. Но все же...
← →
Digitman © (2006-01-19 11:29) [12]Я же сказал - для упрощения реализации трансп.уровня.
Асинхронный транспорт пакетов произвольного размера сложней в реализации (требуется организация стримов приема и передачи).
В дан. же случае нично не мешает на передающей стороне разбить исходный инф.пакет на куски размером в MAX_MSG_SIZE и передать их последовательно, а напринимающей стороне соответственно "склеить" их.
← →
Игорь Шевченко © (2006-01-19 11:30) [13]Кстати, по теме:
http://www.delphimaster.ru/articles/named_pipes/index.html
← →
Alex Konshin © (2006-01-19 11:36) [14]У меня тоже пример есть на сайте. Он в примере MSSQLAlert.
← →
Digitman © (2006-01-19 15:38) [15]"Кайф" в приведенном мной примере хотя бы в том что:
1. Трансп.уровень прозрачен и отныне не заботит никоим образом.
2. Мультипоточность надстройки над трансп.уровнем : любой поток процесса может вызвать в любой секунд CallAction() с целью осуществить индивидуальную независимую акцию инф.обмена с партнером по коннекту.
Страницы: 1 вся ветка
Текущий архив: 2006.04.09;
Скачать: CL | DM;
Память: 0.55 MB
Время: 0.014 c