#include "support/debug.h"
#include "support/FileName.h"
+#include "support/lassert.h"
#include "support/lstrings.h"
#include "support/os.h"
#if defined(_WIN32)
-class PipeEvent : public QEvent {
+class ReadReadyEvent : public QEvent {
public:
///
- PipeEvent(HANDLE inpipe) : QEvent(QEvent::User), inpipe_(inpipe)
+ ReadReadyEvent(DWORD inpipe) : QEvent(QEvent::User), inpipe_(inpipe)
{}
///
- HANDLE pipe() const { return inpipe_; }
+ DWORD inpipe() const { return inpipe_; }
private:
- HANDLE inpipe_;
+ DWORD inpipe_;
};
namespace {
void LyXComm::pipeServer()
{
- int const bufsize = 1024;
- HANDLE inpipe;
-
- outpipe_ = CreateNamedPipe(external_path(outPipeName()).c_str(),
- PIPE_ACCESS_OUTBOUND, PIPE_NOWAIT,
- PIPE_UNLIMITED_INSTANCES,
- bufsize, 0, 0, NULL);
- if (outpipe_ == INVALID_HANDLE_VALUE) {
- lyxerr << "LyXComm: Could not create pipe "
- << external_path(outPipeName()) << '\n'
- << errormsg() << endl;
- return;
+ DWORD i;
+
+ for (i = 0; i <= MAX_PIPES; ++i) {
+ DWORD open_mode;
+ string const pipename = external_path(pipeName(i));
+
+ if (i < MAX_PIPES) {
+ open_mode = PIPE_ACCESS_INBOUND;
+ readbuf_[i].erase();
+ } else {
+ open_mode = PIPE_ACCESS_OUTBOUND;
+ writebuf_.erase();
+ }
+
+ // Manual-reset event, initial state = signaled
+ event_[i] = CreateEvent(NULL, TRUE, TRUE, NULL);
+
+ if (!event_[i]) {
+ LYXERR0("LyXComm: Could not create event for pipe "
+ << pipename.c_str() << '\n' << errormsg());
+ closeHandles(i);
+ return;
+ }
+
+ pipe_[i].overlap.hEvent = event_[i];
+ pipe_[i].handle = CreateNamedPipe(pipename.c_str(),
+ open_mode | FILE_FLAG_OVERLAPPED, PIPE_WAIT,
+ MAX_PIPES, PIPE_BUFSIZE, PIPE_BUFSIZE,
+ PIPE_TIMEOUT, NULL);
+
+ if (pipe_[i].handle == INVALID_HANDLE_VALUE) {
+ LYXERR0("LyXComm: Could not create pipe "
+ << pipename.c_str() << '\n' << errormsg());
+ closeHandles(i);
+ return;
+ }
+
+ startPipe(i);
+ pipe_[i].state = pipe_[i].pending_io ?
+ CONNECTING_STATE : (i < MAX_PIPES ? READING_STATE
+ : WRITING_STATE);
}
- ConnectNamedPipe(outpipe_, NULL);
- // Now change to blocking mode
- DWORD mode = PIPE_WAIT;
- SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL);
-
- while (!checkStopServerEvent()) {
- inpipe = CreateNamedPipe(external_path(inPipeName()).c_str(),
- PIPE_ACCESS_INBOUND, PIPE_WAIT,
- PIPE_UNLIMITED_INSTANCES,
- 0, bufsize, 0, NULL);
- if (inpipe == INVALID_HANDLE_VALUE) {
- lyxerr << "LyXComm: Could not create pipe "
- << external_path(inPipeName()) << '\n'
- << errormsg() << endl;
+
+ // Add the stopserver_ event
+ event_[MAX_PIPES + 1] = stopserver_;
+
+ // We made it!
+ LYXERR(Debug::LYXSERVER, "LyXComm: Connection established");
+ ready_ = true;
+ DWORD status;
+ bool success;
+
+ while (!checkStopServer()) {
+ // Indefinitely wait for the completion of an overlapped
+ // read, write, or connect operation.
+ DWORD wait = WaitForMultipleObjects(MAX_PIPES + 2, event_,
+ FALSE, INFINITE);
+
+ // Determine which pipe instance completed the operation.
+ i = wait - WAIT_OBJECT_0;
+ LASSERT(i >= 0 && i <= MAX_PIPES + 1, /**/);
+
+ // Check whether we were waked up for stopping the pipe server.
+ if (i == MAX_PIPES + 1)
break;
+
+ // Get the result if the operation was pending.
+ if (pipe_[i].pending_io) {
+ success = GetOverlappedResult(pipe_[i].handle,
+ &pipe_[i].overlap, &status, FALSE);
+
+ switch (pipe_[i].state) {
+ case CONNECTING_STATE:
+ // Pending connect operation
+ if (!success) {
+ DWORD const err = GetLastError();
+ if (i == MAX_PIPES
+ && err == ERROR_IO_INCOMPLETE) {
+ // A reply on the output pipe
+ // has not been read, still.
+ // As we have only one instance
+ // for output, we risk a stalled
+ // pipe if no one reads it.
+ // So, if a reader doesn't
+ // appear within about 5 or 6
+ // seconds, we reset it.
+ static int count = 0;
+ Sleep(100);
+ if (++count == 50) {
+ count = 0;
+ resetPipe(i, true);
+ }
+ } else
+ LYXERR0("LyXComm: " << errormsg());
+ continue;
+ }
+ pipe_[i].state = i < MAX_PIPES ? READING_STATE
+ : WRITING_STATE;
+ break;
+
+ case READING_STATE:
+ // Pending read operation
+ LASSERT(i < MAX_PIPES, /**/);
+ if (!success || status == 0) {
+ resetPipe(i);
+ continue;
+ }
+ pipe_[i].nbytes = status;
+ pipe_[i].state = WRITING_STATE;
+ break;
+
+ case WRITING_STATE:
+ // Pending write operation
+ LASSERT(i == MAX_PIPES, /**/);
+ if (!success || status != writebuf_.length()) {
+ resetPipe(i);
+ continue;
+ }
+ break;
+ }
}
- BOOL connected = ConnectNamedPipe(inpipe, NULL);
- // Check whether we are signaled to shutdown the pipe server.
- if (checkStopServerEvent()) {
- CloseHandle(inpipe);
+ // Operate according to the pipe state
+ switch (pipe_[i].state) {
+ case READING_STATE:
+ // The pipe instance is connected to a client
+ // and is ready to read a request.
+ LASSERT(i < MAX_PIPES, /**/);
+ success = ReadFile(pipe_[i].handle,
+ pipe_[i].pipebuf, PIPE_BUFSIZE - 1,
+ &pipe_[i].nbytes, &pipe_[i].overlap);
+
+ if (success && pipe_[i].nbytes != 0) {
+ // The read operation completed successfully.
+ pipe_[i].pending_io = false;
+ pipe_[i].state = WRITING_STATE;
+ continue;
+ }
+
+ if (!success && GetLastError() == ERROR_IO_PENDING) {
+ // The read operation is still pending.
+ pipe_[i].pending_io = true;
+ continue;
+ }
+
+ // Client closed connection (ERROR_BROKEN_PIPE) or
+ // an error occurred; in either case, reset the pipe.
+ if (GetLastError() != ERROR_BROKEN_PIPE) {
+ LYXERR0("LyXComm: " << errormsg());
+ if (!readbuf_[i].empty()) {
+ LYXERR0("LyXComm: truncated command: "
+ << readbuf_[i]);
+ readbuf_[i].erase();
+ }
+ resetPipe(i, true);
+ } else
+ resetPipe(i);
+ break;
+
+ case WRITING_STATE:
+ if (i < MAX_PIPES) {
+ // The request was successfully read
+ // from the client; commit it.
+ ReadReadyEvent * event = new ReadReadyEvent(i);
+ QCoreApplication::postEvent(this,
+ static_cast<QEvent *>(event));
+ // Wait for completion
+ while (pipe_[i].nbytes && !checkStopServer())
+ Sleep(100);
+ pipe_[i].pending_io = false;
+ pipe_[i].state = READING_STATE;
+ continue;
+ }
+ // Let's see whether we have a reply.
+ if (writebuf_.length() == 0) {
+ // No, nothing to do.
+ pipe_[i].pending_io = false;
+ continue;
+ }
+ // Yep, deliver it.
+ success = WriteFile(pipe_[i].handle,
+ writebuf_.c_str(), writebuf_.length(),
+ &status, &pipe_[i].overlap);
+
+ if (success && status == writebuf_.length()) {
+ // The write operation completed successfully.
+ writebuf_.erase();
+ pipe_[i].pending_io = false;
+ resetPipe(i);
+ continue;
+ }
+
+ if (!success && (GetLastError() == ERROR_IO_PENDING)) {
+ // The write operation is still pending.
+ pipe_[i].pending_io = true;
+ continue;
+ }
+
+ // Client closed connection (ERROR_NO_DATA) or
+ // an error occurred; in either case, reset the pipe.
+ if (GetLastError() != ERROR_NO_DATA) {
+ LYXERR0("LyXComm: Error sending message: "
+ << writebuf_ << '\n' << errormsg());
+ resetPipe(i, true);
+ } else
+ resetPipe(i);
break;
}
- if (connected || GetLastError() == ERROR_PIPE_CONNECTED) {
- PipeEvent * event = new PipeEvent(inpipe);
- QCoreApplication::postEvent(this,
- static_cast<QEvent *>(event));
- } else
- CloseHandle(inpipe);
}
- CloseHandle(outpipe_);
+
+ ready_ = false;
+ closeHandles(MAX_PIPES + 1);
+}
+
+
+void LyXComm::closeHandles(DWORD index)
+{
+ for (int i = 0; i <= index; ++i) {
+ if (event_[i]) {
+ ResetEvent(event_[i]);
+ CloseHandle(event_[i]);
+ }
+ if (pipe_[i].handle != INVALID_HANDLE_VALUE)
+ CloseHandle(pipe_[i].handle);
+ }
}
bool LyXComm::event(QEvent * e)
{
if (e->type() == QEvent::User) {
- read_ready(static_cast<PipeEvent *>(e)->pipe());
+ read_ready(static_cast<ReadReadyEvent *>(e)->inpipe());
return true;
}
return false;
}
-BOOL LyXComm::checkStopServerEvent()
+BOOL LyXComm::checkStopServer()
{
return WaitForSingleObject(stopserver_, 0) == WAIT_OBJECT_0 || closing_;
}
+void LyXComm::startPipe(DWORD index)
+{
+ pipe_[index].pending_io = false;
+
+ // Overlapped ConnectNamedPipe should return zero.
+ if (ConnectNamedPipe(pipe_[index].handle, &pipe_[index].overlap)) {
+ // FIXME: What to do? Maybe the pipe server should be reset.
+ LYXERR0("LyXComm: Could not connect pipe "
+ << external_path(pipeName(index)) << '\n'
+ << errormsg());
+ return;
+ }
+
+ switch (GetLastError()) {
+ case ERROR_IO_PENDING:
+ // The overlapped connection is in progress.
+ pipe_[index].pending_io = true;
+ break;
+
+ case ERROR_PIPE_CONNECTED:
+ // Client is already connected, so signal an event.
+ if (SetEvent(pipe_[index].overlap.hEvent))
+ break;
+ default:
+ // Anything else is an error.
+ // FIXME: What to do? Maybe the pipe server should be reset.
+ LYXERR0("LyXComm: An error occurred while connecting pipe "
+ << external_path(pipeName(index)) << '\n'
+ << errormsg());
+ }
+}
+
+
+void LyXComm::resetPipe(DWORD index, bool close_handle)
+{
+ // This method is called when an error occurs or when a client
+ // closes the connection. We first disconnect the pipe instance,
+ // then reconnect it, ready to wait for another client.
+
+ if (!DisconnectNamedPipe(pipe_[index].handle)) {
+ LYXERR0("LyXComm: Could not disconnect pipe "
+ << external_path(pipeName(index)) << '\n'
+ << errormsg());
+ // What to do now? Let's try whether re-creating the pipe helps.
+ close_handle = true;
+ }
+ if (close_handle) {
+ DWORD const open_mode = index < MAX_PIPES ?
+ PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND;
+ string const name = external_path(pipeName(index));
+
+ CloseHandle(pipe_[index].handle);
+
+ pipe_[index].handle = CreateNamedPipe(name.c_str(),
+ open_mode | FILE_FLAG_OVERLAPPED, PIPE_WAIT,
+ MAX_PIPES, PIPE_BUFSIZE, PIPE_BUFSIZE,
+ PIPE_TIMEOUT, NULL);
+
+ if (pipe_[index].handle == INVALID_HANDLE_VALUE) {
+ LYXERR0("LyXComm: Could not reset pipe "
+ << name << '\n' << errormsg());
+ return;
+ }
+ if (index == MAX_PIPES)
+ writebuf_.erase();
+ else
+ readbuf_[index].erase();
+ }
+ startPipe(index);
+ pipe_[index].state = pipe_[index].pending_io ?
+ CONNECTING_STATE : (index < MAX_PIPES ? READING_STATE
+ : WRITING_STATE);
+}
+
+
void LyXComm::openConnection()
{
LYXERR(Debug::LYXSERVER, "LyXComm: Opening connection");
// If we are up, that's an error
if (ready_) {
- lyxerr << "LyXComm: Already connected" << endl;
+ LYXERR0("LyXComm: Already connected");
return;
}
// We assume that we don't make it
return;
}
- // Check whether the pipe is being used by some other program.
+ // Check whether the pipe name is being used by some other program.
if (!stopserver_ && WaitNamedPipe(inPipeName().c_str(), 0)) {
- lyxerr << "LyXComm: Pipe " << external_path(inPipeName())
- << " already exists.\nMaybe another instance of LyX"
- " is using it."
- << endl;
+ LYXERR0("LyXComm: Pipe " << external_path(inPipeName())
+ << " already exists.\nMaybe another instance of LyX"
+ " is using it.");
pipename_.erase();
return;
}
+ // Manual-reset event, initial state = not signaled
stopserver_ = CreateEvent(NULL, TRUE, FALSE, NULL);
- DWORD tid;
- HANDLE thread = CreateThread(NULL, 0, pipeServerWrapper,
- static_cast<void *>(this), 0, &tid);
- if (!thread) {
- lyxerr << "LyXComm: Could not create pipe server thread: "
- << errormsg() << endl;
+ server_thread_ = CreateThread(NULL, 0, pipeServerWrapper,
+ static_cast<void *>(this), 0, NULL);
+ if (!server_thread_) {
+ LYXERR0("LyXComm: Could not create pipe server thread\n"
+ << errormsg());
+ pipename_.erase();
return;
- } else
- CloseHandle(thread);
-
- // We made it!
- ready_ = true;
- LYXERR(Debug::LYXSERVER, "LyXComm: Connection established");
+ }
}
return;
}
- SetEvent(stopserver_);
- HANDLE hpipe = CreateFile(external_path(inPipeName()).c_str(),
- GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
- FILE_ATTRIBUTE_NORMAL, NULL);
- if (hpipe != INVALID_HANDLE_VALUE)
- CloseHandle(hpipe);
-
- ResetEvent(stopserver_);
- CloseHandle(stopserver_);
- ready_ = false;
+ emergencyCleanup();
}
void LyXComm::emergencyCleanup()
{
- if (!pipename_.empty()) {
+ if (ready_) {
SetEvent(stopserver_);
- HANDLE hpipe = CreateFile(external_path(inPipeName()).c_str(),
- GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
- FILE_ATTRIBUTE_NORMAL, NULL);
- if (hpipe != INVALID_HANDLE_VALUE)
- CloseHandle(hpipe);
-
+ // Wait for the pipe server to finish
+ WaitForSingleObject(server_thread_, INFINITE);
+ CloseHandle(server_thread_);
ResetEvent(stopserver_);
CloseHandle(stopserver_);
}
}
-void LyXComm::read_ready(HANDLE inpipe)
+
+void LyXComm::read_ready(DWORD inpipe)
{
- string read_buffer_;
- read_buffer_.erase();
+ // Turn the pipe buffer into a C string
+ DWORD const nbytes = pipe_[inpipe].nbytes;
+ pipe_[inpipe].pipebuf[nbytes] = '\0';
- int const charbuf_size = 100;
- char charbuf[charbuf_size];
+ readbuf_[inpipe] += rtrim(pipe_[inpipe].pipebuf, "\r");
- DWORD status;
- while (ReadFile(inpipe, charbuf, charbuf_size - 1, &status, NULL)) {
- if (status > 0) {
- charbuf[status] = '\0'; // turn it into a c string
- read_buffer_ += rtrim(charbuf, "\r");
- // commit any commands read
- while (read_buffer_.find('\n') != string::npos) {
- // split() grabs the entire string if
- // the delim /wasn't/ found. ?:-P
- string cmd;
- read_buffer_= split(read_buffer_, cmd, '\n');
- cmd = rtrim(cmd, "\r");
- LYXERR(Debug::LYXSERVER, "LyXComm: status:" << status
- << ", read_buffer_:" << read_buffer_
- << ", cmd:" << cmd);
- if (!cmd.empty())
- clientcb_(client_, cmd);
- //\n or not \n?
- }
- }
- }
- if (GetLastError() != ERROR_BROKEN_PIPE) {
- // An error occurred
- LYXERR0("LyXComm: " << errormsg());
- if (!read_buffer_.empty()) {
- LYXERR0("LyXComm: truncated command: " << read_buffer_);
- read_buffer_.erase();
- }
+ // Commit any commands read
+ while (readbuf_[inpipe].find('\n') != string::npos) {
+ // split() grabs the entire string if
+ // the delim /wasn't/ found. ?:-P
+ string cmd;
+ readbuf_[inpipe] = split(readbuf_[inpipe], cmd, '\n');
+ cmd = rtrim(cmd, "\r");
+ LYXERR(Debug::LYXSERVER, "LyXComm: status:" << nbytes
+ << ", readbuf_:" << readbuf_[inpipe]
+ << ", cmd:" << cmd);
+ if (!cmd.empty())
+ clientcb_(client_, cmd);
+ //\n or not \n?
}
- // Client closed the pipe, so disconnect and close.
- DisconnectNamedPipe(inpipe);
- CloseHandle(inpipe);
- FlushFileBuffers(outpipe_);
- DisconnectNamedPipe(outpipe_);
- // Temporarily change to non-blocking mode otherwise
- // ConnectNamedPipe() would block waiting for a connection.
- DWORD mode = PIPE_NOWAIT;
- SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL);
- ConnectNamedPipe(outpipe_, NULL);
- mode = PIPE_WAIT;
- SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL);
+ // Signal that we are done.
+ pipe_[inpipe].nbytes = 0;
}
LYXERR(Debug::LYXSERVER, "LyXComm: Sending '" << msg << '\'');
- if (pipename_.empty()) return;
+ if (pipename_.empty())
+ return;
if (!ready_) {
LYXERR0("LyXComm: Pipes are closed. Could not send " << msg);
return;
}
- bool success;
- int count = 0;
- do {
- DWORD sent;
- success = WriteFile(outpipe_, msg.c_str(), msg.length(), &sent, NULL);
- if (!success) {
- DWORD error = GetLastError();
- if (error == ERROR_NO_DATA) {
- DisconnectNamedPipe(outpipe_);
- DWORD mode = PIPE_NOWAIT;
- SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL);
- ConnectNamedPipe(outpipe_, NULL);
- mode = PIPE_WAIT;
- SetNamedPipeHandleState(outpipe_, &mode, NULL, NULL);
- } else if (error != ERROR_PIPE_LISTENING)
- break;
- }
+ // Wait a couple of secs for completion of a previous write operation.
+ for (int count = 0; writebuf_.length() && count < 20; ++count)
Sleep(100);
- ++count;
- } while (!success && count < 100 && !checkStopServerEvent());
- if (!success) {
- lyxerr << "LyXComm: Error sending message: " << msg
- << '\n' << errormsg()
- << "\nLyXComm: Resetting connection" << endl;
+ if (!writebuf_.length()) {
+ writebuf_ = msg;
+ // Tell the pipe server he has a job to do.
+ SetEvent(pipe_[MAX_PIPES].overlap.hEvent);
+ } else {
+ // Nope, output pipe is still busy. Most likely, a bad client
+ // did not care to read the answer (JabRef is one such client).
+ // Let's do a reset, otherwise the output pipe could remain
+ // stalled if the pipe server failed to reset it.
+ // This will remedy the output pipe stall, but the client will
+ // get a broken pipe error.
+ LYXERR0("LyXComm: Error sending message: " << msg
+ << "\nLyXComm: Output pipe is stalled\n"
+ "LyXComm: Resetting connection");
closeConnection();
- if (!checkStopServerEvent())
+ if (!checkStopServer())
openConnection();
}
}
+string const LyXComm::pipeName(DWORD index) const
+{
+ return index < MAX_PIPES ? inPipeName() : outPipeName();
+}
+
+
#elif !defined (HAVE_MKFIFO)
// We provide a stub class that disables the lyxserver.