This commit is contained in:
Jacob Dufault 2017-03-19 15:08:36 -07:00
parent 4cec26ae12
commit 4bd284ee7e
10 changed files with 69 additions and 75 deletions

View File

@ -33,15 +33,15 @@ struct BufferLocal : public Buffer {
struct ScopedLockPlatform : public ScopedLock { struct ScopedLockPlatform : public ScopedLock {
ScopedLockPlatform(PlatformMutex* mutex) ScopedLockPlatform(PlatformMutex* mutex)
: guard(CreatePlatformScopedMutexLock(mutex)) {} : guard(CreatePlatformScopedMutexLock(mutex)) {}
std::unique_ptr<PlatformScopedMutexLock> guard; std::unique_ptr<PlatformScopedMutexLock> guard;
}; };
struct BufferPlatform : public Buffer { struct BufferPlatform : public Buffer {
explicit BufferPlatform(const std::string& name, size_t capacity) explicit BufferPlatform(const std::string& name, size_t capacity)
: memory_(CreatePlatformSharedMemory(name + "_mem", capacity)), : memory_(CreatePlatformSharedMemory(name + "_mem", capacity)),
mutex_(CreatePlatformMutex(name + "_mtx")) { mutex_(CreatePlatformMutex(name + "_mtx")) {
this->data = memory_->data; this->data = memory_->data;
this->capacity = memory_->capacity; this->capacity = memory_->capacity;
} }
@ -65,7 +65,8 @@ std::unique_ptr<Buffer> Buffer::Create(size_t capacity) {
return MakeUnique<BufferLocal>(capacity); return MakeUnique<BufferLocal>(capacity);
} }
std::unique_ptr<Buffer> Buffer::CreateSharedBuffer(const std::string& name, size_t capacity) { std::unique_ptr<Buffer> Buffer::CreateSharedBuffer(const std::string& name,
size_t capacity) {
return MakeUnique<BufferPlatform>(name, capacity); return MakeUnique<BufferPlatform>(name, capacity);
} }
@ -82,10 +83,8 @@ TEST_CASE("create") {
} }
TEST_CASE("lock") { TEST_CASE("lock") {
auto buffers = { auto buffers = {Buffer::Create(sizeof(int)),
Buffer::Create(sizeof(int)), Buffer::CreateSharedBuffer("indexertest", sizeof(int))};
Buffer::CreateSharedBuffer("indexertest", sizeof(int))
};
for (auto& b : buffers) { for (auto& b : buffers) {
int* data = reinterpret_cast<int*>(b->data); int* data = reinterpret_cast<int*>(b->data);

View File

@ -27,4 +27,3 @@ struct Buffer {
void* data = nullptr; void* data = nullptr;
size_t capacity = 0; size_t capacity = 0;
}; };

View File

@ -15,22 +15,19 @@ struct MessageQueue::BufferMetadata {
// The total number of bytes currently used for messages. This does not // The total number of bytes currently used for messages. This does not
// include the sizeof the buffer metadata. // include the sizeof the buffer metadata.
size_t total_message_bytes() { size_t total_message_bytes() { return total_message_bytes_; }
return total_message_bytes_;
}
private: private:
size_t total_message_bytes_ = 0; size_t total_message_bytes_ = 0;
}; };
MessageQueue::MessageQueue(std::unique_ptr<Buffer> buffer, bool buffer_has_data) : buffer_(std::move(buffer)) { MessageQueue::MessageQueue(std::unique_ptr<Buffer> buffer, bool buffer_has_data)
: buffer_(std::move(buffer)) {
if (!buffer_has_data) if (!buffer_has_data)
new(buffer_->data) BufferMetadata(); new (buffer_->data) BufferMetadata();
} }
void MessageQueue::Enqueue(const Message& message) { void MessageQueue::Enqueue(const Message& message) {}
}
MessageQueue::BufferMetadata* MessageQueue::Metadata() { MessageQueue::BufferMetadata* MessageQueue::Metadata() {
return reinterpret_cast<BufferMetadata*>(buffer_->data); return reinterpret_cast<BufferMetadata*>(buffer_->data);

View File

@ -22,7 +22,7 @@ struct Message {
struct MessageQueue { struct MessageQueue {
// Create a new MessageQueue using |buffer| as the backing data storage. // Create a new MessageQueue using |buffer| as the backing data storage.
// This does *not* take ownership over the memory stored in |buffer|. // This does *not* take ownership over the memory stored in |buffer|.
// //
// If |buffer_has_data| is true, then it is assumed that |buffer| contains // If |buffer_has_data| is true, then it is assumed that |buffer| contains
// data and has already been initialized. It is a perfectly acceptable // data and has already been initialized. It is a perfectly acceptable
// use-case to have multiple completely separate MessageQueue // use-case to have multiple completely separate MessageQueue
@ -52,7 +52,7 @@ struct MessageQueue {
// Take the first available message from the queue. // Take the first available message from the queue.
std::unique_ptr<Message> DequeueFirst(); std::unique_ptr<Message> DequeueFirst();
private: private:
struct BufferMetadata; struct BufferMetadata;
BufferMetadata* Metadata(); BufferMetadata* Metadata();
@ -60,7 +60,6 @@ private:
std::unique_ptr<Buffer> buffer_; std::unique_ptr<Buffer> buffer_;
}; };
/* /*
// TODO: We convert IpcMessage <-> Message as a user-level operation. // TODO: We convert IpcMessage <-> Message as a user-level operation.
// MessageQueue doesn't know about IpcMessage. // MessageQueue doesn't know about IpcMessage.

View File

@ -4,10 +4,8 @@
#include "../third_party/doctest/doctest/doctest.h" #include "../third_party/doctest/doctest/doctest.h"
TEST_SUITE("Platform"); TEST_SUITE("Platform");
TEST_CASE("Mutex lock/unlock (single process)") { TEST_CASE("Mutex lock/unlock (single process)") {
auto m1 = CreatePlatformMutex("indexer-platformmutexttest"); auto m1 = CreatePlatformMutex("indexer-platformmutexttest");
auto l1 = CreatePlatformScopedMutexLock(m1.get()); auto l1 = CreatePlatformScopedMutexLock(m1.get());

View File

@ -17,7 +17,10 @@ struct PlatformSharedMemory {
}; };
std::unique_ptr<PlatformMutex> CreatePlatformMutex(const std::string& name); std::unique_ptr<PlatformMutex> CreatePlatformMutex(const std::string& name);
std::unique_ptr<PlatformScopedMutexLock> CreatePlatformScopedMutexLock(PlatformMutex* mutex); std::unique_ptr<PlatformScopedMutexLock> CreatePlatformScopedMutexLock(
std::unique_ptr<PlatformSharedMemory> CreatePlatformSharedMemory(const std::string& name, size_t size); PlatformMutex* mutex);
std::unique_ptr<PlatformSharedMemory> CreatePlatformSharedMemory(
const std::string& name,
size_t size);
std::string GetWorkingDirectory(); std::string GetWorkingDirectory();

View File

@ -8,8 +8,6 @@
#include <pthread.h> #include <pthread.h>
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -19,36 +17,31 @@
#include <errno.h> #include <errno.h>
#include <signal.h> #include <signal.h>
#include <fcntl.h> /* For O_* constants */ #include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */ #include <sys/stat.h> /* For mode constants */
#include <semaphore.h> #include <semaphore.h>
#include <sys/mman.h> #include <sys/mman.h>
#include <sys/stat.h> /* For mode constants */ #include <sys/stat.h> /* For mode constants */
#include <fcntl.h> /* For O_* constants */ #include <fcntl.h> /* For O_* constants */
struct PlatformMutexLinux : public PlatformMutex { struct PlatformMutexLinux : public PlatformMutex {
sem_t* sem_ = nullptr; sem_t* sem_ = nullptr;
PlatformMutexLinux(const std::string& name) { PlatformMutexLinux(const std::string& name) {
std::cerr << "PlatformMutexLinux name=" << name << std::endl; std::cerr << "PlatformMutexLinux name=" << name << std::endl;
sem_ = sem_open(name.c_str(), O_CREAT, 0666 /*permission*/, 1 /*initial_value*/); sem_ = sem_open(name.c_str(), O_CREAT, 0666 /*permission*/,
1 /*initial_value*/);
} }
~PlatformMutexLinux() override { ~PlatformMutexLinux() override { sem_close(sem_); }
sem_close(sem_);
}
}; };
struct PlatformScopedMutexLockLinux : public PlatformScopedMutexLock { struct PlatformScopedMutexLockLinux : public PlatformScopedMutexLock {
sem_t* sem_ = nullptr; sem_t* sem_ = nullptr;
PlatformScopedMutexLockLinux(sem_t* sem) : sem_(sem) { PlatformScopedMutexLockLinux(sem_t* sem) : sem_(sem) { sem_wait(sem_); }
sem_wait(sem_);
}
~PlatformScopedMutexLockLinux() override { ~PlatformScopedMutexLockLinux() override { sem_post(sem_); }
sem_post(sem_);
}
}; };
void* checked(void* result, const char* expr) { void* checked(void* result, const char* expr) {
@ -87,17 +80,18 @@ struct PlatformSharedMemoryLinux : public PlatformSharedMemory {
// Otherwise, we just open existing shared memory. We don't need to // Otherwise, we just open existing shared memory. We don't need to
// create or initialize it. // create or initialize it.
else { else {
fd_ = CHECKED(shm_open(name_.c_str(), fd_ = CHECKED(shm_open(
O_RDWR, /* memory is read/write, create if needed */ name_.c_str(), O_RDWR, /* memory is read/write, create if needed */
S_IRUSR | S_IWUSR /* user read/write */)); S_IRUSR | S_IWUSR /* user read/write */));
} }
// Map the shared memory to an address. // Map the shared memory to an address.
shared = CHECKED(mmap(nullptr /*kernel assigned starting address*/, shared =
shmem_size, PROT_READ | PROT_WRITE, MAP_SHARED, CHECKED(mmap(nullptr /*kernel assigned starting address*/, shmem_size,
fd_, 0 /*offset*/)); PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0 /*offset*/));
std::cout << "Open shared memory name=" << name << ", fd=" << fd_ << ", shared=" << shared << std::endl; std::cout << "Open shared memory name=" << name << ", fd=" << fd_
<< ", shared=" << shared << std::endl;
} }
~PlatformSharedMemoryLinux() override { ~PlatformSharedMemoryLinux() override {
@ -110,17 +104,19 @@ struct PlatformSharedMemoryLinux : public PlatformSharedMemory {
#undef CHECKED #undef CHECKED
std::unique_ptr<PlatformMutex> CreatePlatformMutex(const std::string& name) { std::unique_ptr<PlatformMutex> CreatePlatformMutex(const std::string& name) {
std::string name2 = "/" + name; std::string name2 = "/" + name;
return MakeUnique<PlatformMutexLinux>(name2); return MakeUnique<PlatformMutexLinux>(name2);
} }
std::unique_ptr<PlatformScopedMutexLock> CreatePlatformScopedMutexLock(PlatformMutex* mutex) { std::unique_ptr<PlatformScopedMutexLock> CreatePlatformScopedMutexLock(
return MakeUnique<PlatformScopedMutexLockLinux>(static_cast<PlatformMutexLinux*>(mutex)->sem_); PlatformMutex* mutex) {
return MakeUnique<PlatformScopedMutexLockLinux>(
static_cast<PlatformMutexLinux*>(mutex)->sem_);
} }
std::unique_ptr<PlatformSharedMemory> CreatePlatformSharedMemory(const std::string& name) { std::unique_ptr<PlatformSharedMemory> CreatePlatformSharedMemory(
const std::string& name) {
std::string name2 = "/" + name; std::string name2 = "/" + name;
return MakeUnique<PlatformSharedMemoryLinux>(name2); return MakeUnique<PlatformSharedMemoryLinux>(name2);
} }

View File

@ -12,19 +12,23 @@ namespace {
DWORD CheckForError(std::vector<DWORD> allow) { DWORD CheckForError(std::vector<DWORD> allow) {
DWORD error = GetLastError(); DWORD error = GetLastError();
if (error == ERROR_SUCCESS || std::find(allow.begin(), allow.end(), error) != allow.end()) if (error == ERROR_SUCCESS ||
std::find(allow.begin(), allow.end(), error) != allow.end())
return error; return error;
// See http://stackoverflow.com/a/17387176 // See http://stackoverflow.com/a/17387176
LPSTR message_buffer = nullptr; LPSTR message_buffer = nullptr;
size_t size = FormatMessageA( size_t size = FormatMessageA(
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
NULL, error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPSTR)&message_buffer, 0, NULL); FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPSTR)&message_buffer, 0, NULL);
std::string message(message_buffer, size); std::string message(message_buffer, size);
LocalFree(message_buffer); LocalFree(message_buffer);
std::cerr << "Windows error code=" << error << ", message=" << message << std::endl; std::cerr << "Windows error code=" << error << ", message=" << message
assert(false); // debugger break << std::endl;
assert(false); // debugger break
exit(1); exit(1);
} }
@ -34,7 +38,7 @@ struct PlatformMutexWin : public PlatformMutex {
PlatformMutexWin(const std::string& name) { PlatformMutexWin(const std::string& name) {
std::cerr << "[win] Creating mutex with name " << name << std::endl; std::cerr << "[win] Creating mutex with name " << name << std::endl;
raw_mutex = CreateMutex(nullptr, false /*initial_owner*/, name.c_str()); raw_mutex = CreateMutex(nullptr, false /*initial_owner*/, name.c_str());
CheckForError({ ERROR_ALREADY_EXISTS }); CheckForError({ERROR_ALREADY_EXISTS});
} }
~PlatformMutexWin() override { ~PlatformMutexWin() override {
@ -63,21 +67,16 @@ struct PlatformSharedMemoryWin : public PlatformSharedMemory {
HANDLE shmem_; HANDLE shmem_;
PlatformSharedMemoryWin(const std::string& name, size_t capacity) { PlatformSharedMemoryWin(const std::string& name, size_t capacity) {
std::cerr << "[win] Creating shared memory with name " << name << " and capacity " << capacity << std::endl; std::cerr << "[win] Creating shared memory with name " << name
<< " and capacity " << capacity << std::endl;
this->name = name; this->name = name;
shmem_ = CreateFileMapping( shmem_ = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0,
INVALID_HANDLE_VALUE, capacity, name.c_str());
NULL, CheckForError({ERROR_ALREADY_EXISTS} /*allow*/);
PAGE_READWRITE,
0,
capacity,
name.c_str()
);
CheckForError({ ERROR_ALREADY_EXISTS } /*allow*/);
data = MapViewOfFile(shmem_, FILE_MAP_ALL_ACCESS, 0, 0, capacity); data = MapViewOfFile(shmem_, FILE_MAP_ALL_ACCESS, 0, 0, capacity);
CheckForError({ ERROR_ALREADY_EXISTS } /*allow*/); CheckForError({ERROR_ALREADY_EXISTS} /*allow*/);
this->capacity = capacity; this->capacity = capacity;
} }
@ -97,11 +96,15 @@ std::unique_ptr<PlatformMutex> CreatePlatformMutex(const std::string& name) {
return MakeUnique<PlatformMutexWin>(name); return MakeUnique<PlatformMutexWin>(name);
} }
std::unique_ptr<PlatformScopedMutexLock> CreatePlatformScopedMutexLock(PlatformMutex* mutex) { std::unique_ptr<PlatformScopedMutexLock> CreatePlatformScopedMutexLock(
return MakeUnique<PlatformScopedMutexLockWin>(static_cast<PlatformMutexWin*>(mutex)->raw_mutex); PlatformMutex* mutex) {
return MakeUnique<PlatformScopedMutexLockWin>(
static_cast<PlatformMutexWin*>(mutex)->raw_mutex);
} }
std::unique_ptr<PlatformSharedMemory> CreatePlatformSharedMemory(const std::string& name, size_t size) { std::unique_ptr<PlatformSharedMemory> CreatePlatformSharedMemory(
const std::string& name,
size_t size) {
return MakeUnique<PlatformSharedMemoryWin>(name, size); return MakeUnique<PlatformSharedMemoryWin>(name, size);
} }

View File

@ -27,7 +27,7 @@ void ResizableBuffer::Append(void* content, size_t content_size) {
assert(capacity_ >= 0); assert(capacity_ >= 0);
size_t new_size = size + content_size; size_t new_size = size + content_size;
// Grow buffer capacity if needed. // Grow buffer capacity if needed.
if (new_size >= capacity_) { if (new_size >= capacity_) {
size_t new_capacity = capacity_ * 2; size_t new_capacity = capacity_ * 2;
@ -91,7 +91,7 @@ TEST_CASE("reset does not reallocate") {
while (b.size < kInitialCapacity) while (b.size < kInitialCapacity)
b.Append(&b, sizeof(b)); b.Append(&b, sizeof(b));
void* buffer = b.buffer; void* buffer = b.buffer;
b.Reset(); b.Reset();
REQUIRE(b.buffer == buffer); REQUIRE(b.buffer == buffer);

View File

@ -16,6 +16,6 @@ struct ResizableBuffer {
// than |size|. // than |size|.
size_t size; size_t size;
private: private:
size_t capacity_; size_t capacity_;
}; };