From 40620bd83fe0df597c54f584e15bb1a7a6bfdea4 Mon Sep 17 00:00:00 2001 From: Enrico Forestieri Date: Sat, 29 Aug 2009 23:42:06 +0000 Subject: [PATCH] On Windows, use true asynchronous I/O for the pipe server. Now, even the nasty JabRef works. git-svn-id: svn://svn.lyx.org/lyx/lyx-devel/trunk@31239 a592a061-630c-0410-9148-cb99ea01b6c8 --- src/Server.cpp | 512 +++++++++++++++++++++++++++++++++++-------------- src/Server.h | 56 +++++- 2 files changed, 417 insertions(+), 151 deletions(-) diff --git a/src/Server.cpp b/src/Server.cpp index 9837ef7aeb..4c92c89cbf 100644 --- a/src/Server.cpp +++ b/src/Server.cpp @@ -49,6 +49,7 @@ #include "support/debug.h" #include "support/FileName.h" +#include "support/lassert.h" #include "support/lstrings.h" #include "support/os.h" @@ -78,16 +79,16 @@ namespace lyx { #if defined(_WIN32) -class PipeEvent : public QEvent { +class ReadReadyEvent : public QEvent { public: /// - PipeEvent(HANDLE inpipe) : QEvent(QEvent::User), inpipe_(inpipe) + ReadReadyEvent(DWORD inpipe) : QEvent(QEvent::User), inpipe_(inpipe) {} /// - HANDLE pipe() const { return inpipe_; } + DWORD inpipe() const { return inpipe_; } private: - HANDLE inpipe_; + DWORD inpipe_; }; namespace { @@ -138,76 +139,332 @@ LyXComm::LyXComm(string const & pip, Server * cli, ClientCallbackfct ccb) void LyXComm::pipeServer() { - int const bufsize = 1024; - HANDLE inpipe; - - outpipe_ = CreateNamedPipe(external_path(outPipeName()).c_str(), - PIPE_ACCESS_OUTBOUND, PIPE_NOWAIT, - PIPE_UNLIMITED_INSTANCES, - bufsize, 0, 0, NULL); - if (outpipe_ == INVALID_HANDLE_VALUE) { - lyxerr << "LyXComm: Could not create pipe " - << external_path(outPipeName()) << '\n' - << errormsg() << endl; - return; + DWORD i; + + for (i = 0; i <= MAX_PIPES; ++i) { + DWORD open_mode; + string const pipename = external_path(pipeName(i)); + + if (i < MAX_PIPES) { + open_mode = PIPE_ACCESS_INBOUND; + readbuf_[i].erase(); + } else { + open_mode = PIPE_ACCESS_OUTBOUND; + writebuf_.erase(); + } + + // Manual-reset event, initial state = signaled + event_[i] = CreateEvent(NULL, TRUE, TRUE, NULL); + + if (!event_[i]) { + LYXERR0("LyXComm: Could not create event for pipe " + << pipename.c_str() << '\n' << errormsg()); + closeHandles(i); + return; + } + + pipe_[i].overlap.hEvent = event_[i]; + pipe_[i].handle = CreateNamedPipe(pipename.c_str(), + open_mode | FILE_FLAG_OVERLAPPED, PIPE_WAIT, + MAX_PIPES, PIPE_BUFSIZE, PIPE_BUFSIZE, + PIPE_TIMEOUT, NULL); + + if (pipe_[i].handle == INVALID_HANDLE_VALUE) { + LYXERR0("LyXComm: Could not create pipe " + << pipename.c_str() << '\n' << errormsg()); + closeHandles(i); + return; + } + + startPipe(i); + pipe_[i].state = pipe_[i].pending_io ? + CONNECTING_STATE : (i < MAX_PIPES ? READING_STATE + : WRITING_STATE); } - ConnectNamedPipe(outpipe_, NULL); - // Now change to blocking mode - DWORD mode = PIPE_WAIT; - SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL); - - while (!checkStopServerEvent()) { - inpipe = CreateNamedPipe(external_path(inPipeName()).c_str(), - PIPE_ACCESS_INBOUND, PIPE_WAIT, - PIPE_UNLIMITED_INSTANCES, - 0, bufsize, 0, NULL); - if (inpipe == INVALID_HANDLE_VALUE) { - lyxerr << "LyXComm: Could not create pipe " - << external_path(inPipeName()) << '\n' - << errormsg() << endl; + + // Add the stopserver_ event + event_[MAX_PIPES + 1] = stopserver_; + + // We made it! + LYXERR(Debug::LYXSERVER, "LyXComm: Connection established"); + ready_ = true; + DWORD status; + bool success; + + while (!checkStopServer()) { + // Indefinitely wait for the completion of an overlapped + // read, write, or connect operation. + DWORD wait = WaitForMultipleObjects(MAX_PIPES + 2, event_, + FALSE, INFINITE); + + // Determine which pipe instance completed the operation. + i = wait - WAIT_OBJECT_0; + LASSERT(i >= 0 && i <= MAX_PIPES + 1, /**/); + + // Check whether we were waked up for stopping the pipe server. + if (i == MAX_PIPES + 1) break; + + // Get the result if the operation was pending. + if (pipe_[i].pending_io) { + success = GetOverlappedResult(pipe_[i].handle, + &pipe_[i].overlap, &status, FALSE); + + switch (pipe_[i].state) { + case CONNECTING_STATE: + // Pending connect operation + if (!success) { + DWORD const err = GetLastError(); + if (i == MAX_PIPES + && err == ERROR_IO_INCOMPLETE) { + // A reply on the output pipe + // has not been read, still. + // As we have only one instance + // for output, we risk a stalled + // pipe if no one reads it. + // So, if a reader doesn't + // appear within about 5 or 6 + // seconds, we reset it. + static int count = 0; + Sleep(100); + if (++count == 50) { + count = 0; + resetPipe(i, true); + } + } else + LYXERR0("LyXComm: " << errormsg()); + continue; + } + pipe_[i].state = i < MAX_PIPES ? READING_STATE + : WRITING_STATE; + break; + + case READING_STATE: + // Pending read operation + LASSERT(i < MAX_PIPES, /**/); + if (!success || status == 0) { + resetPipe(i); + continue; + } + pipe_[i].nbytes = status; + pipe_[i].state = WRITING_STATE; + break; + + case WRITING_STATE: + // Pending write operation + LASSERT(i == MAX_PIPES, /**/); + if (!success || status != writebuf_.length()) { + resetPipe(i); + continue; + } + break; + } } - BOOL connected = ConnectNamedPipe(inpipe, NULL); - // Check whether we are signaled to shutdown the pipe server. - if (checkStopServerEvent()) { - CloseHandle(inpipe); + // Operate according to the pipe state + switch (pipe_[i].state) { + case READING_STATE: + // The pipe instance is connected to a client + // and is ready to read a request. + LASSERT(i < MAX_PIPES, /**/); + success = ReadFile(pipe_[i].handle, + pipe_[i].pipebuf, PIPE_BUFSIZE - 1, + &pipe_[i].nbytes, &pipe_[i].overlap); + + if (success && pipe_[i].nbytes != 0) { + // The read operation completed successfully. + pipe_[i].pending_io = false; + pipe_[i].state = WRITING_STATE; + continue; + } + + if (!success && GetLastError() == ERROR_IO_PENDING) { + // The read operation is still pending. + pipe_[i].pending_io = true; + continue; + } + + // Client closed connection (ERROR_BROKEN_PIPE) or + // an error occurred; in either case, reset the pipe. + if (GetLastError() != ERROR_BROKEN_PIPE) { + LYXERR0("LyXComm: " << errormsg()); + if (!readbuf_[i].empty()) { + LYXERR0("LyXComm: truncated command: " + << readbuf_[i]); + readbuf_[i].erase(); + } + resetPipe(i, true); + } else + resetPipe(i); + break; + + case WRITING_STATE: + if (i < MAX_PIPES) { + // The request was successfully read + // from the client; commit it. + ReadReadyEvent * event = new ReadReadyEvent(i); + QCoreApplication::postEvent(this, + static_cast(event)); + // Wait for completion + while (pipe_[i].nbytes && !checkStopServer()) + Sleep(100); + pipe_[i].pending_io = false; + pipe_[i].state = READING_STATE; + continue; + } + // Let's see whether we have a reply. + if (writebuf_.length() == 0) { + // No, nothing to do. + pipe_[i].pending_io = false; + continue; + } + // Yep, deliver it. + success = WriteFile(pipe_[i].handle, + writebuf_.c_str(), writebuf_.length(), + &status, &pipe_[i].overlap); + + if (success && status == writebuf_.length()) { + // The write operation completed successfully. + writebuf_.erase(); + pipe_[i].pending_io = false; + resetPipe(i); + continue; + } + + if (!success && (GetLastError() == ERROR_IO_PENDING)) { + // The write operation is still pending. + pipe_[i].pending_io = true; + continue; + } + + // Client closed connection (ERROR_NO_DATA) or + // an error occurred; in either case, reset the pipe. + if (GetLastError() != ERROR_NO_DATA) { + LYXERR0("LyXComm: Error sending message: " + << writebuf_ << '\n' << errormsg()); + resetPipe(i, true); + } else + resetPipe(i); break; } - if (connected || GetLastError() == ERROR_PIPE_CONNECTED) { - PipeEvent * event = new PipeEvent(inpipe); - QCoreApplication::postEvent(this, - static_cast(event)); - } else - CloseHandle(inpipe); } - CloseHandle(outpipe_); + + ready_ = false; + closeHandles(MAX_PIPES + 1); +} + + +void LyXComm::closeHandles(DWORD index) +{ + for (int i = 0; i <= index; ++i) { + if (event_[i]) { + ResetEvent(event_[i]); + CloseHandle(event_[i]); + } + if (pipe_[i].handle != INVALID_HANDLE_VALUE) + CloseHandle(pipe_[i].handle); + } } bool LyXComm::event(QEvent * e) { if (e->type() == QEvent::User) { - read_ready(static_cast(e)->pipe()); + read_ready(static_cast(e)->inpipe()); return true; } return false; } -BOOL LyXComm::checkStopServerEvent() +BOOL LyXComm::checkStopServer() { return WaitForSingleObject(stopserver_, 0) == WAIT_OBJECT_0 || closing_; } +void LyXComm::startPipe(DWORD index) +{ + pipe_[index].pending_io = false; + + // Overlapped ConnectNamedPipe should return zero. + if (ConnectNamedPipe(pipe_[index].handle, &pipe_[index].overlap)) { + // FIXME: What to do? Maybe the pipe server should be reset. + LYXERR0("LyXComm: Could not connect pipe " + << external_path(pipeName(index)) << '\n' + << errormsg()); + return; + } + + switch (GetLastError()) { + case ERROR_IO_PENDING: + // The overlapped connection is in progress. + pipe_[index].pending_io = true; + break; + + case ERROR_PIPE_CONNECTED: + // Client is already connected, so signal an event. + if (SetEvent(pipe_[index].overlap.hEvent)) + break; + default: + // Anything else is an error. + // FIXME: What to do? Maybe the pipe server should be reset. + LYXERR0("LyXComm: An error occurred while connecting pipe " + << external_path(pipeName(index)) << '\n' + << errormsg()); + } +} + + +void LyXComm::resetPipe(DWORD index, bool close_handle) +{ + // This method is called when an error occurs or when a client + // closes the connection. We first disconnect the pipe instance, + // then reconnect it, ready to wait for another client. + + if (!DisconnectNamedPipe(pipe_[index].handle)) { + LYXERR0("LyXComm: Could not disconnect pipe " + << external_path(pipeName(index)) << '\n' + << errormsg()); + // What to do now? Let's try whether re-creating the pipe helps. + close_handle = true; + } + if (close_handle) { + DWORD const open_mode = index < MAX_PIPES ? + PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND; + string const name = external_path(pipeName(index)); + + CloseHandle(pipe_[index].handle); + + pipe_[index].handle = CreateNamedPipe(name.c_str(), + open_mode | FILE_FLAG_OVERLAPPED, PIPE_WAIT, + MAX_PIPES, PIPE_BUFSIZE, PIPE_BUFSIZE, + PIPE_TIMEOUT, NULL); + + if (pipe_[index].handle == INVALID_HANDLE_VALUE) { + LYXERR0("LyXComm: Could not reset pipe " + << name << '\n' << errormsg()); + return; + } + if (index == MAX_PIPES) + writebuf_.erase(); + else + readbuf_[index].erase(); + } + startPipe(index); + pipe_[index].state = pipe_[index].pending_io ? + CONNECTING_STATE : (index < MAX_PIPES ? READING_STATE + : WRITING_STATE); +} + + void LyXComm::openConnection() { LYXERR(Debug::LYXSERVER, "LyXComm: Opening connection"); // If we are up, that's an error if (ready_) { - lyxerr << "LyXComm: Already connected" << endl; + LYXERR0("LyXComm: Already connected"); return; } // We assume that we don't make it @@ -218,30 +475,25 @@ void LyXComm::openConnection() return; } - // Check whether the pipe is being used by some other program. + // Check whether the pipe name is being used by some other program. if (!stopserver_ && WaitNamedPipe(inPipeName().c_str(), 0)) { - lyxerr << "LyXComm: Pipe " << external_path(inPipeName()) - << " already exists.\nMaybe another instance of LyX" - " is using it." - << endl; + LYXERR0("LyXComm: Pipe " << external_path(inPipeName()) + << " already exists.\nMaybe another instance of LyX" + " is using it."); pipename_.erase(); return; } + // Manual-reset event, initial state = not signaled stopserver_ = CreateEvent(NULL, TRUE, FALSE, NULL); - DWORD tid; - HANDLE thread = CreateThread(NULL, 0, pipeServerWrapper, - static_cast(this), 0, &tid); - if (!thread) { - lyxerr << "LyXComm: Could not create pipe server thread: " - << errormsg() << endl; + server_thread_ = CreateThread(NULL, 0, pipeServerWrapper, + static_cast(this), 0, NULL); + if (!server_thread_) { + LYXERR0("LyXComm: Could not create pipe server thread\n" + << errormsg()); + pipename_.erase(); return; - } else - CloseHandle(thread); - - // We made it! - ready_ = true; - LYXERR(Debug::LYXSERVER, "LyXComm: Connection established"); + } } @@ -260,83 +512,47 @@ void LyXComm::closeConnection() return; } - SetEvent(stopserver_); - HANDLE hpipe = CreateFile(external_path(inPipeName()).c_str(), - GENERIC_WRITE, 0, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, NULL); - if (hpipe != INVALID_HANDLE_VALUE) - CloseHandle(hpipe); - - ResetEvent(stopserver_); - CloseHandle(stopserver_); - ready_ = false; + emergencyCleanup(); } void LyXComm::emergencyCleanup() { - if (!pipename_.empty()) { + if (ready_) { SetEvent(stopserver_); - HANDLE hpipe = CreateFile(external_path(inPipeName()).c_str(), - GENERIC_WRITE, 0, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, NULL); - if (hpipe != INVALID_HANDLE_VALUE) - CloseHandle(hpipe); - + // Wait for the pipe server to finish + WaitForSingleObject(server_thread_, INFINITE); + CloseHandle(server_thread_); ResetEvent(stopserver_); CloseHandle(stopserver_); } } -void LyXComm::read_ready(HANDLE inpipe) + +void LyXComm::read_ready(DWORD inpipe) { - string read_buffer_; - read_buffer_.erase(); + // Turn the pipe buffer into a C string + DWORD const nbytes = pipe_[inpipe].nbytes; + pipe_[inpipe].pipebuf[nbytes] = '\0'; - int const charbuf_size = 100; - char charbuf[charbuf_size]; + readbuf_[inpipe] += rtrim(pipe_[inpipe].pipebuf, "\r"); - DWORD status; - while (ReadFile(inpipe, charbuf, charbuf_size - 1, &status, NULL)) { - if (status > 0) { - charbuf[status] = '\0'; // turn it into a c string - read_buffer_ += rtrim(charbuf, "\r"); - // commit any commands read - while (read_buffer_.find('\n') != string::npos) { - // split() grabs the entire string if - // the delim /wasn't/ found. ?:-P - string cmd; - read_buffer_= split(read_buffer_, cmd, '\n'); - cmd = rtrim(cmd, "\r"); - LYXERR(Debug::LYXSERVER, "LyXComm: status:" << status - << ", read_buffer_:" << read_buffer_ - << ", cmd:" << cmd); - if (!cmd.empty()) - clientcb_(client_, cmd); - //\n or not \n? - } - } - } - if (GetLastError() != ERROR_BROKEN_PIPE) { - // An error occurred - LYXERR0("LyXComm: " << errormsg()); - if (!read_buffer_.empty()) { - LYXERR0("LyXComm: truncated command: " << read_buffer_); - read_buffer_.erase(); - } + // Commit any commands read + while (readbuf_[inpipe].find('\n') != string::npos) { + // split() grabs the entire string if + // the delim /wasn't/ found. ?:-P + string cmd; + readbuf_[inpipe] = split(readbuf_[inpipe], cmd, '\n'); + cmd = rtrim(cmd, "\r"); + LYXERR(Debug::LYXSERVER, "LyXComm: status:" << nbytes + << ", readbuf_:" << readbuf_[inpipe] + << ", cmd:" << cmd); + if (!cmd.empty()) + clientcb_(client_, cmd); + //\n or not \n? } - // Client closed the pipe, so disconnect and close. - DisconnectNamedPipe(inpipe); - CloseHandle(inpipe); - FlushFileBuffers(outpipe_); - DisconnectNamedPipe(outpipe_); - // Temporarily change to non-blocking mode otherwise - // ConnectNamedPipe() would block waiting for a connection. - DWORD mode = PIPE_NOWAIT; - SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL); - ConnectNamedPipe(outpipe_, NULL); - mode = PIPE_WAIT; - SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL); + // Signal that we are done. + pipe_[inpipe].nbytes = 0; } @@ -349,45 +565,45 @@ void LyXComm::send(string const & msg) LYXERR(Debug::LYXSERVER, "LyXComm: Sending '" << msg << '\''); - if (pipename_.empty()) return; + if (pipename_.empty()) + return; if (!ready_) { LYXERR0("LyXComm: Pipes are closed. Could not send " << msg); return; } - bool success; - int count = 0; - do { - DWORD sent; - success = WriteFile(outpipe_, msg.c_str(), msg.length(), &sent, NULL); - if (!success) { - DWORD error = GetLastError(); - if (error == ERROR_NO_DATA) { - DisconnectNamedPipe(outpipe_); - DWORD mode = PIPE_NOWAIT; - SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL); - ConnectNamedPipe(outpipe_, NULL); - mode = PIPE_WAIT; - SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL); - } else if (error != ERROR_PIPE_LISTENING) - break; - } + // Wait a couple of secs for completion of a previous write operation. + for (int count = 0; writebuf_.length() && count < 20; ++count) Sleep(100); - ++count; - } while (!success && count < 100 && !checkStopServerEvent()); - if (!success) { - lyxerr << "LyXComm: Error sending message: " << msg - << '\n' << errormsg() - << "\nLyXComm: Resetting connection" << endl; + if (!writebuf_.length()) { + writebuf_ = msg; + // Tell the pipe server he has a job to do. + SetEvent(pipe_[MAX_PIPES].overlap.hEvent); + } else { + // Nope, output pipe is still busy. Most likely, a bad client + // did not care to read the answer (JabRef is one such client). + // Let's do a reset, otherwise the output pipe could remain + // stalled if the pipe server failed to reset it. + // This will remedy the output pipe stall, but the client will + // get a broken pipe error. + LYXERR0("LyXComm: Error sending message: " << msg + << "\nLyXComm: Output pipe is stalled\n" + "LyXComm: Resetting connection"); closeConnection(); - if (!checkStopServerEvent()) + if (!checkStopServer()) openConnection(); } } +string const LyXComm::pipeName(DWORD index) const +{ + return index < MAX_PIPES ? inPipeName() : outPipeName(); +} + + #elif !defined (HAVE_MKFIFO) // We provide a stub class that disables the lyxserver. diff --git a/src/Server.h b/src/Server.h index 31e7c4b7c4..337b9110a7 100644 --- a/src/Server.h +++ b/src/Server.h @@ -41,6 +41,31 @@ class LyXComm : public boost::signals::trackable { #else class LyXComm : public QObject { Q_OBJECT + + /// Max number of (read) pipe instances + enum { MAX_PIPES = 10 }; + + /// I/O buffer size + enum { PIPE_BUFSIZE = 512 }; + + /// Pipe client time-out + enum { PIPE_TIMEOUT = 5000 }; + + /// Pipe states + enum PipeState { + CONNECTING_STATE, + READING_STATE, + WRITING_STATE + }; + + typedef struct { + OVERLAPPED overlap; + HANDLE handle; + char pipebuf[PIPE_BUFSIZE]; + DWORD nbytes; + PipeState state; + bool pending_io; + } PipeInst; #endif public: /** When we receive a message, we send it to a client. @@ -65,7 +90,7 @@ public: #ifndef _WIN32 void read_ready(); #else - void read_ready(HANDLE); + void read_ready(DWORD); /// The pipe server void pipeServer(); @@ -97,7 +122,29 @@ private: /// This is -1 if not open int outfd_; #else - HANDLE outpipe_; + /// Start an overlapped connection + void startPipe(DWORD); + + /// Reset an overlapped connection + void resetPipe(DWORD, bool close_handle = false); + + /// Close event and pipe handles + void closeHandles(DWORD); + + /// The filename of a (in or out) pipe instance + std::string const pipeName(DWORD) const; + + /// Pipe instances + PipeInst pipe_[MAX_PIPES + 1]; + + /// Pipe server control events + HANDLE event_[MAX_PIPES + 2]; + + /// Request buffers + std::string readbuf_[MAX_PIPES]; + + /// Reply buffer + std::string writebuf_; #endif /// Are we up and running? @@ -117,10 +164,13 @@ private: bool event(QEvent *); /// Check whether the pipe server must be stopped - BOOL checkStopServerEvent(); + BOOL checkStopServer(); /// Windows event for stopping the pipe server HANDLE stopserver_; + + /// Pipe server thread handle + HANDLE server_thread_; #endif }; -- 2.39.2