]> git.lyx.org Git - features.git/blobdiff - src/Server.cpp
On Windows, use true asynchronous I/O for the pipe server.
[features.git] / src / Server.cpp
index 9837ef7aebd44ecb2dc0ed097ca7e59fc586f2bb..4c92c89cbf52a879c672386305a4718a1bb991cf 100644 (file)
@@ -49,6 +49,7 @@
 
 #include "support/debug.h"
 #include "support/FileName.h"
+#include "support/lassert.h"
 #include "support/lstrings.h"
 #include "support/os.h"
 
@@ -78,16 +79,16 @@ namespace lyx {
 
 #if defined(_WIN32)
 
-class PipeEvent : public QEvent {
+class ReadReadyEvent : public QEvent {
 public:
        ///
-       PipeEvent(HANDLE inpipe) : QEvent(QEvent::User), inpipe_(inpipe)
+       ReadReadyEvent(DWORD inpipe) : QEvent(QEvent::User), inpipe_(inpipe)
        {}
        ///
-       HANDLE pipe() const { return inpipe_; }
+       DWORD inpipe() const { return inpipe_; }
 
 private:
-       HANDLE inpipe_;
+       DWORD inpipe_;
 };
 
 namespace {
@@ -138,76 +139,332 @@ LyXComm::LyXComm(string const & pip, Server * cli, ClientCallbackfct ccb)
 
 void LyXComm::pipeServer()
 {
-       int const bufsize = 1024;
-       HANDLE inpipe;
-
-       outpipe_ = CreateNamedPipe(external_path(outPipeName()).c_str(),
-                                  PIPE_ACCESS_OUTBOUND, PIPE_NOWAIT,
-                                  PIPE_UNLIMITED_INSTANCES,
-                                  bufsize, 0, 0, NULL);
-       if (outpipe_ == INVALID_HANDLE_VALUE) {
-               lyxerr << "LyXComm: Could not create pipe "
-                      << external_path(outPipeName()) << '\n'
-                      << errormsg() << endl;
-               return;
+       DWORD i;
+
+       for (i = 0; i <= MAX_PIPES; ++i) {
+               DWORD open_mode;
+               string const pipename = external_path(pipeName(i));
+
+               if (i < MAX_PIPES) {
+                       open_mode = PIPE_ACCESS_INBOUND;
+                       readbuf_[i].erase();
+               } else {
+                       open_mode = PIPE_ACCESS_OUTBOUND;
+                       writebuf_.erase();
+               }
+
+               // Manual-reset event, initial state = signaled
+               event_[i] = CreateEvent(NULL, TRUE, TRUE, NULL);
+
+               if (!event_[i]) {
+                       LYXERR0("LyXComm: Could not create event for pipe "
+                               << pipename.c_str() << '\n' << errormsg());
+                       closeHandles(i);
+                       return;
+               }
+
+               pipe_[i].overlap.hEvent = event_[i];
+               pipe_[i].handle = CreateNamedPipe(pipename.c_str(),
+                               open_mode | FILE_FLAG_OVERLAPPED, PIPE_WAIT,
+                               MAX_PIPES, PIPE_BUFSIZE, PIPE_BUFSIZE,
+                               PIPE_TIMEOUT, NULL);
+
+               if (pipe_[i].handle == INVALID_HANDLE_VALUE) {
+                       LYXERR0("LyXComm: Could not create pipe "
+                               << pipename.c_str() << '\n' << errormsg());
+                       closeHandles(i);
+                       return;
+               }
+
+               startPipe(i);
+               pipe_[i].state = pipe_[i].pending_io ?
+                       CONNECTING_STATE : (i < MAX_PIPES ? READING_STATE
+                                                         : WRITING_STATE);
        }
-       ConnectNamedPipe(outpipe_, NULL);
-       // Now change to blocking mode
-       DWORD mode = PIPE_WAIT;
-       SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL);
-
-       while (!checkStopServerEvent()) {
-               inpipe = CreateNamedPipe(external_path(inPipeName()).c_str(),
-                                        PIPE_ACCESS_INBOUND, PIPE_WAIT,
-                                        PIPE_UNLIMITED_INSTANCES,
-                                        0, bufsize, 0, NULL);
-               if (inpipe == INVALID_HANDLE_VALUE) {
-                       lyxerr << "LyXComm: Could not create pipe "
-                              << external_path(inPipeName()) << '\n'
-                              << errormsg() << endl;
+
+       // Add the stopserver_ event
+       event_[MAX_PIPES + 1] = stopserver_;
+
+       // We made it!
+       LYXERR(Debug::LYXSERVER, "LyXComm: Connection established");
+       ready_ = true;
+       DWORD status;
+       bool success;
+
+       while (!checkStopServer()) {
+               // Indefinitely wait for the completion of an overlapped
+               // read, write, or connect operation.
+               DWORD wait = WaitForMultipleObjects(MAX_PIPES + 2, event_,
+                                                   FALSE, INFINITE);
+
+               // Determine which pipe instance completed the operation.
+               i = wait - WAIT_OBJECT_0;
+               LASSERT(i >= 0 && i <= MAX_PIPES + 1, /**/);
+
+               // Check whether we were waked up for stopping the pipe server.
+               if (i == MAX_PIPES + 1)
                        break;
+
+               // Get the result if the operation was pending.
+               if (pipe_[i].pending_io) {
+                       success = GetOverlappedResult(pipe_[i].handle,
+                                       &pipe_[i].overlap, &status, FALSE);
+
+                       switch (pipe_[i].state) {
+                       case CONNECTING_STATE:
+                               // Pending connect operation
+                               if (!success) {
+                                       DWORD const err = GetLastError();
+                                       if (i == MAX_PIPES
+                                           && err == ERROR_IO_INCOMPLETE) {
+                                               // A reply on the output pipe
+                                               // has not been read, still.
+                                               // As we have only one instance
+                                               // for output, we risk a stalled
+                                               // pipe if no one reads it.
+                                               // So, if a reader doesn't
+                                               // appear within about 5 or 6
+                                               // seconds, we reset it.
+                                               static int count = 0;
+                                               Sleep(100);
+                                               if (++count == 50) {
+                                                       count = 0;
+                                                       resetPipe(i, true);
+                                               }
+                                       } else
+                                               LYXERR0("LyXComm: " << errormsg());
+                                       continue;
+                               }
+                               pipe_[i].state = i < MAX_PIPES ? READING_STATE
+                                                              : WRITING_STATE;
+                               break;
+
+                       case READING_STATE:
+                               // Pending read operation
+                               LASSERT(i < MAX_PIPES, /**/);
+                               if (!success || status == 0) {
+                                       resetPipe(i);
+                                       continue;
+                               }
+                               pipe_[i].nbytes = status;
+                               pipe_[i].state = WRITING_STATE;
+                               break;
+
+                       case WRITING_STATE:
+                               // Pending write operation
+                               LASSERT(i == MAX_PIPES, /**/);
+                               if (!success || status != writebuf_.length()) {
+                                       resetPipe(i);
+                                       continue;
+                               }
+                               break;
+                       }
                }
 
-               BOOL connected = ConnectNamedPipe(inpipe, NULL);
-               // Check whether we are signaled to shutdown the pipe server.
-               if (checkStopServerEvent()) {
-                       CloseHandle(inpipe);
+               // Operate according to the pipe state
+               switch (pipe_[i].state) {
+               case READING_STATE:
+                       // The pipe instance is connected to a client
+                       // and is ready to read a request.
+                       LASSERT(i < MAX_PIPES, /**/);
+                       success = ReadFile(pipe_[i].handle,
+                                       pipe_[i].pipebuf, PIPE_BUFSIZE - 1,
+                                       &pipe_[i].nbytes, &pipe_[i].overlap);
+
+                       if (success && pipe_[i].nbytes != 0) {
+                               // The read operation completed successfully.
+                               pipe_[i].pending_io = false;
+                               pipe_[i].state = WRITING_STATE;
+                               continue;
+                       }
+
+                       if (!success && GetLastError() == ERROR_IO_PENDING) {
+                               // The read operation is still pending.
+                               pipe_[i].pending_io = true;
+                               continue;
+                       }
+
+                       // Client closed connection (ERROR_BROKEN_PIPE) or
+                       // an error occurred; in either case, reset the pipe.
+                       if (GetLastError() != ERROR_BROKEN_PIPE) {
+                               LYXERR0("LyXComm: " << errormsg());
+                               if (!readbuf_[i].empty()) {
+                                       LYXERR0("LyXComm: truncated command: "
+                                               << readbuf_[i]);
+                                       readbuf_[i].erase();
+                               }
+                               resetPipe(i, true);
+                       } else
+                               resetPipe(i);
+                       break;
+
+               case WRITING_STATE:
+                       if (i < MAX_PIPES) {
+                               // The request was successfully read
+                               // from the client; commit it.
+                               ReadReadyEvent * event = new ReadReadyEvent(i);
+                               QCoreApplication::postEvent(this,
+                                               static_cast<QEvent *>(event));
+                               // Wait for completion
+                               while (pipe_[i].nbytes && !checkStopServer())
+                                       Sleep(100);
+                               pipe_[i].pending_io = false;
+                               pipe_[i].state = READING_STATE;
+                               continue;
+                       }
+                       // Let's see whether we have a reply.
+                       if (writebuf_.length() == 0) {
+                               // No, nothing to do.
+                               pipe_[i].pending_io = false;
+                               continue;
+                       }
+                       // Yep, deliver it.
+                       success = WriteFile(pipe_[i].handle,
+                                       writebuf_.c_str(), writebuf_.length(),
+                                       &status, &pipe_[i].overlap);
+
+                       if (success && status == writebuf_.length()) {
+                               // The write operation completed successfully.
+                               writebuf_.erase();
+                               pipe_[i].pending_io = false;
+                               resetPipe(i);
+                               continue;
+                       }
+
+                       if (!success && (GetLastError() == ERROR_IO_PENDING)) {
+                               // The write operation is still pending.
+                               pipe_[i].pending_io = true;
+                               continue;
+                       }
+
+                       // Client closed connection (ERROR_NO_DATA) or
+                       // an error occurred; in either case, reset the pipe.
+                       if (GetLastError() != ERROR_NO_DATA) {
+                               LYXERR0("LyXComm: Error sending message: "
+                                       << writebuf_ << '\n' << errormsg());
+                               resetPipe(i, true);
+                       } else
+                               resetPipe(i);
                        break;
                }
-               if (connected || GetLastError() == ERROR_PIPE_CONNECTED) {
-                       PipeEvent * event = new PipeEvent(inpipe);
-                       QCoreApplication::postEvent(this,
-                                       static_cast<QEvent *>(event));
-               } else
-                       CloseHandle(inpipe);
        }
-       CloseHandle(outpipe_);
+
+       ready_ = false;
+       closeHandles(MAX_PIPES + 1);
+}
+
+
+void LyXComm::closeHandles(DWORD index)
+{
+       for (int i = 0; i <= index; ++i) {
+               if (event_[i]) {
+                       ResetEvent(event_[i]);
+                       CloseHandle(event_[i]);
+               }
+               if (pipe_[i].handle != INVALID_HANDLE_VALUE)
+                       CloseHandle(pipe_[i].handle);
+       }
 }
 
 
 bool LyXComm::event(QEvent * e)
 {
        if (e->type() == QEvent::User) {
-               read_ready(static_cast<PipeEvent *>(e)->pipe());
+               read_ready(static_cast<ReadReadyEvent *>(e)->inpipe());
                return true;
        }
        return false;
 }
 
 
-BOOL LyXComm::checkStopServerEvent()
+BOOL LyXComm::checkStopServer()
 {
        return WaitForSingleObject(stopserver_, 0) == WAIT_OBJECT_0 || closing_;
 }
 
 
+void LyXComm::startPipe(DWORD index)
+{
+       pipe_[index].pending_io = false;
+
+       // Overlapped ConnectNamedPipe should return zero.
+       if (ConnectNamedPipe(pipe_[index].handle, &pipe_[index].overlap)) {
+               // FIXME: What to do? Maybe the pipe server should be reset.
+               LYXERR0("LyXComm: Could not connect pipe "
+                       << external_path(pipeName(index)) << '\n'
+                       << errormsg());
+               return;
+       }
+
+       switch (GetLastError()) {
+       case ERROR_IO_PENDING:
+               // The overlapped connection is in progress.
+               pipe_[index].pending_io = true;
+               break;
+
+       case ERROR_PIPE_CONNECTED:
+               // Client is already connected, so signal an event.
+               if (SetEvent(pipe_[index].overlap.hEvent))
+                       break;
+       default:
+               // Anything else is an error.
+               // FIXME: What to do? Maybe the pipe server should be reset.
+               LYXERR0("LyXComm: An error occurred while connecting pipe "
+                       << external_path(pipeName(index)) << '\n'
+                       << errormsg());
+       }
+}
+
+
+void LyXComm::resetPipe(DWORD index, bool close_handle)
+{
+       // This method is called when an error occurs or when a client
+       // closes the connection. We first disconnect the pipe instance,
+       // then reconnect it, ready to wait for another client.
+
+       if (!DisconnectNamedPipe(pipe_[index].handle)) {
+               LYXERR0("LyXComm: Could not disconnect pipe "
+                       << external_path(pipeName(index)) << '\n'
+                       << errormsg());
+               // What to do now? Let's try whether re-creating the pipe helps.
+               close_handle = true;
+       }
+       if (close_handle) {
+               DWORD const open_mode = index < MAX_PIPES ?
+                               PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND;
+               string const name = external_path(pipeName(index));
+
+               CloseHandle(pipe_[index].handle);
+
+               pipe_[index].handle = CreateNamedPipe(name.c_str(),
+                               open_mode | FILE_FLAG_OVERLAPPED, PIPE_WAIT,
+                               MAX_PIPES, PIPE_BUFSIZE, PIPE_BUFSIZE,
+                               PIPE_TIMEOUT, NULL);
+
+               if (pipe_[index].handle == INVALID_HANDLE_VALUE) {
+                       LYXERR0("LyXComm: Could not reset pipe "
+                               << name << '\n' << errormsg());
+                       return;
+               }
+               if (index == MAX_PIPES)
+                       writebuf_.erase();
+               else
+                       readbuf_[index].erase();
+       }
+       startPipe(index);
+       pipe_[index].state = pipe_[index].pending_io ?
+                       CONNECTING_STATE : (index < MAX_PIPES ? READING_STATE
+                                                             : WRITING_STATE);
+}
+
+
 void LyXComm::openConnection()
 {
        LYXERR(Debug::LYXSERVER, "LyXComm: Opening connection");
 
        // If we are up, that's an error
        if (ready_) {
-               lyxerr << "LyXComm: Already connected" << endl;
+               LYXERR0("LyXComm: Already connected");
                return;
        }
        // We assume that we don't make it
@@ -218,30 +475,25 @@ void LyXComm::openConnection()
                return;
        }
 
-       // Check whether the pipe is being used by some other program.
+       // Check whether the pipe name is being used by some other program.
        if (!stopserver_ && WaitNamedPipe(inPipeName().c_str(), 0)) {
-               lyxerr << "LyXComm: Pipe " << external_path(inPipeName())
-                      << " already exists.\nMaybe another instance of LyX"
-                         " is using it."
-                      << endl;
+               LYXERR0("LyXComm: Pipe " << external_path(inPipeName())
+                       << " already exists.\nMaybe another instance of LyX"
+                          " is using it.");
                pipename_.erase();
                return;
        }
 
+       // Manual-reset event, initial state = not signaled
        stopserver_ = CreateEvent(NULL, TRUE, FALSE, NULL);
-       DWORD tid;
-       HANDLE thread = CreateThread(NULL, 0, pipeServerWrapper,
-                                    static_cast<void *>(this), 0, &tid);
-       if (!thread) {
-               lyxerr << "LyXComm: Could not create pipe server thread: "
-                      << errormsg() << endl;
+       server_thread_ = CreateThread(NULL, 0, pipeServerWrapper,
+                                    static_cast<void *>(this), 0, NULL);
+       if (!server_thread_) {
+               LYXERR0("LyXComm: Could not create pipe server thread\n"
+                       << errormsg());
+               pipename_.erase();
                return;
-       } else
-               CloseHandle(thread);
-
-       // We made it!
-       ready_ = true;
-       LYXERR(Debug::LYXSERVER, "LyXComm: Connection established");
+       }
 }
 
 
@@ -260,83 +512,47 @@ void LyXComm::closeConnection()
                return;
        }
 
-       SetEvent(stopserver_);
-       HANDLE hpipe = CreateFile(external_path(inPipeName()).c_str(),
-                                 GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
-                                 FILE_ATTRIBUTE_NORMAL, NULL);
-       if (hpipe != INVALID_HANDLE_VALUE)
-               CloseHandle(hpipe);
-
-       ResetEvent(stopserver_);
-       CloseHandle(stopserver_);
-       ready_ = false;
+       emergencyCleanup();
 }
 
 
 void LyXComm::emergencyCleanup()
 {
-       if (!pipename_.empty()) {
+       if (ready_) {
                SetEvent(stopserver_);
-               HANDLE hpipe = CreateFile(external_path(inPipeName()).c_str(),
-                                         GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
-                                         FILE_ATTRIBUTE_NORMAL, NULL);
-               if (hpipe != INVALID_HANDLE_VALUE)
-                       CloseHandle(hpipe);
-
+               // Wait for the pipe server to finish
+               WaitForSingleObject(server_thread_, INFINITE);
+               CloseHandle(server_thread_);
                ResetEvent(stopserver_);
                CloseHandle(stopserver_);
        }
 }
 
-void LyXComm::read_ready(HANDLE inpipe)
+
+void LyXComm::read_ready(DWORD inpipe)
 {
-       string read_buffer_;
-       read_buffer_.erase();
+       // Turn the pipe buffer into a C string
+       DWORD const nbytes = pipe_[inpipe].nbytes;
+       pipe_[inpipe].pipebuf[nbytes] = '\0';
 
-       int const charbuf_size = 100;
-       char charbuf[charbuf_size];
+       readbuf_[inpipe] += rtrim(pipe_[inpipe].pipebuf, "\r");
 
-       DWORD status;
-       while (ReadFile(inpipe, charbuf, charbuf_size - 1, &status, NULL)) {
-               if (status > 0) {
-                       charbuf[status] = '\0'; // turn it into a c string
-                       read_buffer_ += rtrim(charbuf, "\r");
-                       // commit any commands read
-                       while (read_buffer_.find('\n') != string::npos) {
-                               // split() grabs the entire string if
-                               // the delim /wasn't/ found. ?:-P
-                               string cmd;
-                               read_buffer_= split(read_buffer_, cmd, '\n');
-                               cmd = rtrim(cmd, "\r");
-                               LYXERR(Debug::LYXSERVER, "LyXComm: status:" << status
-                                       << ", read_buffer_:" << read_buffer_
-                                       << ", cmd:" << cmd);
-                               if (!cmd.empty())
-                                       clientcb_(client_, cmd);
-                                       //\n or not \n?
-                       }
-               }
-       }
-       if (GetLastError() != ERROR_BROKEN_PIPE) {
-               // An error occurred
-               LYXERR0("LyXComm: " << errormsg());
-               if (!read_buffer_.empty()) {
-                       LYXERR0("LyXComm: truncated command: " << read_buffer_);
-                       read_buffer_.erase();
-               }
+       // Commit any commands read
+       while (readbuf_[inpipe].find('\n') != string::npos) {
+               // split() grabs the entire string if
+               // the delim /wasn't/ found. ?:-P
+               string cmd;
+               readbuf_[inpipe] = split(readbuf_[inpipe], cmd, '\n');
+               cmd = rtrim(cmd, "\r");
+               LYXERR(Debug::LYXSERVER, "LyXComm: status:" << nbytes
+                       << ", readbuf_:" << readbuf_[inpipe]
+                       << ", cmd:" << cmd);
+               if (!cmd.empty())
+                       clientcb_(client_, cmd);
+                       //\n or not \n?
        }
-       // Client closed the pipe, so disconnect and close.
-       DisconnectNamedPipe(inpipe);
-       CloseHandle(inpipe);
-       FlushFileBuffers(outpipe_);
-       DisconnectNamedPipe(outpipe_);
-       // Temporarily change to non-blocking mode otherwise
-       // ConnectNamedPipe() would block waiting for a connection.
-       DWORD mode = PIPE_NOWAIT;
-       SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL);
-       ConnectNamedPipe(outpipe_, NULL);
-       mode = PIPE_WAIT;
-       SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL);
+       // Signal that we are done.
+       pipe_[inpipe].nbytes = 0;
 }
 
 
@@ -349,45 +565,45 @@ void LyXComm::send(string const & msg)
 
        LYXERR(Debug::LYXSERVER, "LyXComm: Sending '" << msg << '\'');
 
-       if (pipename_.empty()) return;
+       if (pipename_.empty())
+               return;
 
        if (!ready_) {
                LYXERR0("LyXComm: Pipes are closed. Could not send " << msg);
                return;
        }
 
-       bool success;
-       int count = 0;
-       do {
-               DWORD sent;
-               success = WriteFile(outpipe_, msg.c_str(), msg.length(), &sent, NULL);
-               if (!success) {
-                       DWORD error = GetLastError();
-                       if (error == ERROR_NO_DATA) {
-                               DisconnectNamedPipe(outpipe_);
-                               DWORD mode = PIPE_NOWAIT;
-                               SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL);
-                               ConnectNamedPipe(outpipe_, NULL);
-                               mode = PIPE_WAIT;
-                               SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL);
-                       } else if (error != ERROR_PIPE_LISTENING)
-                               break;
-               }
+       // Wait a couple of secs for completion of a previous write operation.
+       for (int count = 0; writebuf_.length() && count < 20; ++count)
                Sleep(100);
-               ++count;
-       } while (!success && count < 100 && !checkStopServerEvent());
 
-       if (!success) {
-               lyxerr << "LyXComm: Error sending message: " << msg
-                      << '\n' << errormsg()
-                      << "\nLyXComm: Resetting connection" << endl;
+       if (!writebuf_.length()) {
+               writebuf_ = msg;
+               // Tell the pipe server he has a job to do.
+               SetEvent(pipe_[MAX_PIPES].overlap.hEvent);
+       } else {
+               // Nope, output pipe is still busy. Most likely, a bad client
+               // did not care to read the answer (JabRef is one such client).
+               // Let's do a reset, otherwise the output pipe could remain
+               // stalled if the pipe server failed to reset it.
+               // This will remedy the output pipe stall, but the client will
+               // get a broken pipe error.
+               LYXERR0("LyXComm: Error sending message: " << msg
+                       << "\nLyXComm: Output pipe is stalled\n"
+                          "LyXComm: Resetting connection");
                closeConnection();
-               if (!checkStopServerEvent())
+               if (!checkStopServer())
                        openConnection();
        }
 }
 
 
+string const LyXComm::pipeName(DWORD index) const
+{
+       return index < MAX_PIPES ? inPipeName() : outPipeName();
+}
+
+
 #elif !defined (HAVE_MKFIFO)
 // We provide a stub class that disables the lyxserver.