ipc cleanup

This commit is contained in:
Jacob Dufault 2017-03-25 12:18:25 -07:00
parent 174533534d
commit c060e5178b
14 changed files with 693 additions and 1735 deletions

View File

@ -1,2 +1,5 @@
-std=c++11
-Ithird_party/rapidjson/include
-Ithird_party/rapidjson/include
-IC:/Program Files/LLVM/include
-fms-compatibility
-fdelayed-template-parsing

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,6 @@
#include "indexer.h"
#include <algorithm>
#include <chrono>
#include "serializer.h"
@ -1204,18 +1205,6 @@ void emptyIndexEntityReference(CXClientData client_data,
IndexedFile Parse(std::string filename,
std::vector<std::string> args,
bool dump_ast) {
// TODO!!
// TODO!!
// TODO!!
// TODO!!: Strip useless defs from IndexedFile before returning
// TODO!!: Strip useless defs from IndexedFile before returning
// TODO!!: Strip useless defs from IndexedFile before returning
// TODO!!: Strip useless defs from IndexedFile before returning
// TODO!!: Strip useless defs from IndexedFile before returning
// TODO!!: Strip useless defs from IndexedFile before returning
// TODO!!
// TODO!!
// TODO!!
clang_toggleCrashRecovery(1);
args.push_back("-std=c++11");

436
ipc.cc
View File

@ -1,436 +0,0 @@
#include "ipc.h"
#include "serializer.h"
#include "utils.h"
#include "third_party/doctest/doctest/doctest.h"
namespace {
// The absolute smallest partial payload we should send. This must be >0, ie, 1 is the
// minimum. Keep a reasonably high value so we don't send needlessly send tiny payloads.
const int kMinimumPartialPayloadSize = 128;
const int kBufferSize = 1024 * 1024 * 32; // number of chars/bytes (32mb) in the message buffer.
// JSON-encoded message that is passed across shared memory.
//
// Messages are funky objects. They contain potentially variable amounts of
// data and are passed between processes. This means that they need to be
// fully relocatable, ie, it is possible to memmove them in memory to a
// completely different address.
struct JsonMessage {
IpcId ipc_id;
int partial_message_id;
bool has_more_chunks;
size_t payload_size;
void* payload() {
return reinterpret_cast<char*>(this) + sizeof(JsonMessage);
}
void Setup(IpcId ipc_id, int partial_message_id, bool has_more_chunks, size_t payload_size, const char* payload) {
this->ipc_id = ipc_id;
this->partial_message_id = partial_message_id;
this->has_more_chunks = has_more_chunks;
this->payload_size = payload_size;
char* payload_dest = reinterpret_cast<char*>(this) + sizeof(JsonMessage);
memcpy(payload_dest, payload, payload_size);
}
};
std::string NameToServerName(const std::string& name) {
return name + "server";
}
std::string NameToClientName(const std::string& name, int client_id) {
return name + "client" + std::to_string(client_id);
}
}
IpcRegistry* IpcRegistry::instance_ = nullptr;
std::unique_ptr<IpcMessage> IpcRegistry::Allocate(IpcId id) {
assert(allocators_);
auto it = allocators_->find(id);
assert(it != allocators_->end() && "No registered allocator for id");
return std::unique_ptr<IpcMessage>(it->second());
}
struct IpcDirectionalChannel::MessageBuffer {
MessageBuffer(void* buffer, size_t buffer_size, bool initialize) {
real_buffer = buffer;
real_buffer_size = buffer_size;
if (initialize)
new(real_buffer) Metadata();
}
// Pointer to the start of the actual buffer and the
// amount of storage actually available.
void* real_buffer;
size_t real_buffer_size;
template<typename T>
T* Offset(size_t offset) const {
return reinterpret_cast<T*>(static_cast<char*>(real_buffer) + offset);
}
struct Metadata {
// The number of bytes that are currently used in the buffer minus the
// size of this Metadata struct.
size_t bytes_used = 0;
int next_partial_message_id = 0;
int num_outstanding_partial_messages = 0;
};
Metadata* metadata() const {
return Offset<Metadata>(0);
}
size_t bytes_available() const {
return real_buffer_size - sizeof(Metadata) - metadata()->bytes_used;
}
JsonMessage* message_at_offset(size_t offset) const {
return Offset<JsonMessage>(sizeof(Metadata) + offset);
}
// First json message.
JsonMessage* first_message() const {
return message_at_offset(0);
}
// First free, writable json message. Make sure to increase *bytes_used()
// by any written size.
JsonMessage* free_message() const {
return message_at_offset(metadata()->bytes_used);
}
struct Iterator {
void* buffer;
size_t remaining_bytes;
Iterator(void* buffer, size_t remaining_bytes) : buffer(buffer), remaining_bytes(remaining_bytes) {}
JsonMessage* get() const {
assert(buffer);
return reinterpret_cast<JsonMessage*>(buffer);
}
JsonMessage* operator*() const {
return get();
}
JsonMessage* operator->() const {
return get();
}
void operator++() {
size_t next_message_offset = sizeof(JsonMessage) + get()->payload_size;
if (next_message_offset >= remaining_bytes) {
assert(next_message_offset == remaining_bytes);
buffer = nullptr;
remaining_bytes = 0;
return;
}
buffer = (char*)buffer + next_message_offset;
remaining_bytes -= next_message_offset;
}
bool operator==(const Iterator& other) const {
return buffer == other.buffer && remaining_bytes == other.remaining_bytes;
}
bool operator!=(const Iterator& other) const {
return !(*this == other);
}
};
Iterator begin() const {
if (metadata()->bytes_used == 0)
return end();
return Iterator(first_message(), metadata()->bytes_used);
}
Iterator end() const {
return Iterator(nullptr, 0);
}
};
struct IpcDirectionalChannel::ResizableBuffer {
void* memory;
size_t size;
size_t capacity;
ResizableBuffer() {
memory = malloc(128);
size = 0;
capacity = 128;
}
~ResizableBuffer() {
free(memory);
size = 0;
capacity = 0;
}
void Append(void* content, size_t content_size) {
assert(capacity);
// Grow memory if needed.
if ((size + content_size) >= capacity) {
size_t new_capacity = capacity * 2;
while (new_capacity < size + content_size)
new_capacity *= 2;
void* new_memory = malloc(new_capacity);
assert(size < capacity);
memcpy(new_memory, memory, size);
free(memory);
memory = new_memory;
capacity = new_capacity;
}
// Append new content into memory.
memcpy((char*)memory + size, content, content_size);
size += content_size;
}
void Reset() {
size = 0;
}
};
IpcDirectionalChannel::ResizableBuffer* IpcDirectionalChannel::CreateOrFindResizableBuffer(int id) {
auto it = resizable_buffers.find(id);
if (it != resizable_buffers.end())
return it->second.get();
return (resizable_buffers[id] = MakeUnique<ResizableBuffer>()).get();
}
void IpcDirectionalChannel::RemoveResizableBuffer(int id) {
resizable_buffers.erase(id);
}
IpcDirectionalChannel::IpcDirectionalChannel(const std::string& name, bool initialize_shared_memory) {
shared = CreatePlatformSharedMemory(name + "memory", kBufferSize);
mutex = CreatePlatformMutex(name + "mutex");
local = std::unique_ptr<char>(new char[kBufferSize]);
// TODO: connecting a client will allocate reset shared state on the
// buffer. We need to store if we "initialized".
shared_buffer = MakeUnique<MessageBuffer>(shared->data, kBufferSize, initialize_shared_memory);
local_buffer = MakeUnique<MessageBuffer>(local.get(), kBufferSize, true /*initialize*/);
}
IpcDirectionalChannel::~IpcDirectionalChannel() {}
enum class DispatchResult {
RunAgain,
Break
};
// Run |action| an arbitrary number of times.
void IpcDispatch(PlatformMutex* mutex, std::function<DispatchResult()> action) {
bool first = true;
int log_iteration_count = 0;
int log_count = 0;
while (true) {
if (!first) {
if (log_iteration_count > 1000) {
log_iteration_count = 0;
std::cerr << "[info]: shmem full, waiting (" << log_count++ << ")" << std::endl; // TODO: remove
}
++log_iteration_count;
// TODO: See if we can figure out a way to use condition variables cross-process.
std::this_thread::sleep_for(std::chrono::microseconds(0));
}
first = false;
std::unique_ptr<PlatformScopedMutexLock> lock = CreatePlatformScopedMutexLock(mutex);
if (action() == DispatchResult::RunAgain)
continue;
break;
}
}
void IpcDirectionalChannel::PushMessage(IpcMessage* message) {
assert(message->ipc_id != IpcId::Invalid);
assert(kBufferSize > sizeof(JsonMessage) + kMinimumPartialPayloadSize);
rapidjson::StringBuffer output;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(output);
writer.SetFormatOptions(
rapidjson::PrettyFormatOptions::kFormatSingleLineArray);
writer.SetIndent(' ', 2);
message->Serialize(writer);
//std::cerr << "Sending message with id " << message->runtime_id() << " (hash " << message->hashed_runtime_id() << ")" << std::endl;
size_t payload_size = output.GetSize();
const char* payload = output.GetString();
if (payload_size == 0)
return;
int partial_message_id = 0;
std::cerr << "Starting dispatch of payload with size " << payload_size << std::endl;
int count = 0;
IpcDispatch(mutex.get(), [&]() {
assert(payload_size > 0);
// We cannot find the entire payload in the buffer. We
// have to send chunks of it over time.
if ((sizeof(JsonMessage) + payload_size) > shared_buffer->bytes_available()) {
if ((sizeof(JsonMessage) + kMinimumPartialPayloadSize) > shared_buffer->bytes_available())
return DispatchResult::RunAgain;
if (partial_message_id == 0)
partial_message_id = ++shared_buffer->metadata()->next_partial_message_id; // note: pre-increment so we 1 as initial value
size_t sent_payload_size = shared_buffer->bytes_available() - sizeof(JsonMessage);
shared_buffer->free_message()->Setup(message->ipc_id, partial_message_id, true /*has_more_chunks*/, sent_payload_size, payload);
shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + sent_payload_size;
//shared_buffer->free_message()->ipc_id = IpcId::Invalid; // Note: free_message() may be past writable memory.
if (count++ > 50) {
std::cerr << "x50 Sending partial message with payload_size=" << sent_payload_size << std::endl;
count = 0;
}
// Prepare for next time.
payload_size -= sent_payload_size;
payload += sent_payload_size;
return DispatchResult::RunAgain;
}
// The entire payload fits. Send it all now.
else {
// Include partial message id, as there could have been previous parts of this payload.
shared_buffer->free_message()->Setup(message->ipc_id, partial_message_id, false /*has_more_chunks*/, payload_size, payload);
shared_buffer->metadata()->bytes_used += sizeof(JsonMessage) + payload_size;
shared_buffer->free_message()->ipc_id = IpcId::Invalid;
//std::cerr << "Sending full message with payload_size=" << payload_size << std::endl;
return DispatchResult::Break;
}
});
}
void AddIpcMessageFromJsonMessage(std::vector<std::unique_ptr<IpcMessage>>& result, IpcId ipc_id, void* payload, size_t payload_size) {
rapidjson::Document document;
document.Parse(reinterpret_cast<const char*>(payload), payload_size);
bool has_error = document.HasParseError();
auto error = document.GetParseError();
std::unique_ptr<IpcMessage> base_message = IpcRegistry::instance()->Allocate(ipc_id);
base_message->Deserialize(document);
result.emplace_back(std::move(base_message));
}
std::vector<std::unique_ptr<IpcMessage>> IpcDirectionalChannel::TakeMessages() {
std::vector<std::unique_ptr<IpcMessage>> result;
do {
// Move data from shared memory into a local buffer. Do this
// before parsing the blocks so that other processes can begin
// posting data as soon as possible.
{
std::unique_ptr<PlatformScopedMutexLock> lock = CreatePlatformScopedMutexLock(mutex.get());
assert(shared_buffer->metadata()->bytes_used <= kBufferSize);
memcpy(local.get(), shared->data, sizeof(MessageBuffer::Metadata) + shared_buffer->metadata()->bytes_used);
shared_buffer->metadata()->bytes_used = 0;
shared_buffer->free_message()->ipc_id = IpcId::Invalid;
}
// Parse blocks from shared memory.
for (JsonMessage* message : *local_buffer) {
//std::cerr << "Got message with payload_size=" << message->payload_size << std::endl;
if (message->partial_message_id != 0) {
auto* buf = CreateOrFindResizableBuffer(message->partial_message_id);
buf->Append(message->payload(), message->payload_size);
if (!message->has_more_chunks) {
AddIpcMessageFromJsonMessage(result, message->ipc_id, buf->memory, buf->size);
RemoveResizableBuffer(message->partial_message_id);
}
}
else {
assert(!message->has_more_chunks);
AddIpcMessageFromJsonMessage(result, message->ipc_id, message->payload(), message->payload_size);
}
}
local_buffer->metadata()->bytes_used = 0;
// Let other threads run. We still want to run as fast as possible, though.
std::this_thread::sleep_for(std::chrono::microseconds(0));
} while (resizable_buffers.size() > 0);
return result;
}
IpcServer::IpcServer(const std::string& name, int num_clients)
: server_(NameToServerName(name), true /*initialize_shared_memory*/) {
for (int i = 0; i < num_clients; ++i) {
clients_.push_back(MakeUnique<IpcDirectionalChannel>(NameToClientName(name, i), true /*initialize_shared_memory*/));
}
}
void IpcServer::SendToClient(int client_id, IpcMessage* message) {
clients_[client_id]->PushMessage(message);
}
std::vector<std::unique_ptr<IpcMessage>> IpcServer::TakeMessages() {
return server_.TakeMessages();
}
IpcClient::IpcClient(const std::string& name, int client_id)
: server_(NameToServerName(name), false /*initialize_shared_memory*/),
client_(NameToClientName(name, client_id), false /*initialize_shared_memory*/) {}
void IpcClient::SendToServer(IpcMessage* message) {
server_.PushMessage(message);
}
std::vector<std::unique_ptr<IpcMessage>> IpcClient::TakeMessages() {
return client_.TakeMessages();
}
template<typename T>
struct TestIpcMessage : IpcMessage {
T data;
TestIpcMessage() : IpcMessage(IpcId::Test) {}
~TestIpcMessage() override {}
// IpcMessage:
void Serialize(Writer& writer) override {
Reflect(writer, data);
}
void Deserialize(Reader& reader) override {
Reflect(reader, data);
}
};
#if false
TEST_CASE("foo") {
IpcRegistry::instance()->Register<TestIpcMessage<std::string>>(IpcId::Test);
IpcDirectionalChannel channel0("indexertestmemory", true /*initialize_shared_memory*/);
IpcDirectionalChannel channel1("indexertestmemory", false /*initialize_shared_memory*/);
TestIpcMessage<std::string> m;
m.data = "hey there";
channel0.PushMessage(&m);
std::vector<std::unique_ptr<IpcMessage>> messages = channel1.TakeMessages();
REQUIRE(messages.size() == 1);
REQUIRE(messages[0]->ipc_id == m.ipc_id);
REQUIRE(reinterpret_cast<TestIpcMessage<std::string>*>(messages[0].get())->data == m.data);
}
#endif

155
ipc.h
View File

@ -1,155 +0,0 @@
#pragma once
#include <iostream>
#include <chrono>
#include <string>
#include <thread>
#include <unordered_map>
#include <rapidjson/document.h>
#include <rapidjson/prettywriter.h>
#include "src/platform.h"
#include "serializer.h"
#include "utils.h"
// TODO: We need to add support for payloads larger than the maximum shared memory buffer size.
enum class IpcId : int {
// Invalid request id.
Invalid = 0,
Quit = 1,
IsAlive,
OpenProject,
IndexTranslationUnitRequest,
IndexTranslationUnitResponse,
// This is a language server request. The actual request method
// id is embedded within the request state.
LanguageServerRequest,
// TODO: remove
DocumentSymbolsRequest,
DocumentSymbolsResponse,
DocumentCodeLensRequest,
DocumentCodeLensResponse,
CodeLensResolveRequest,
CodeLensResolveResponse,
WorkspaceSymbolsRequest,
WorkspaceSymbolsResponse,
Test
};
namespace std {
template <>
struct hash<IpcId> {
size_t operator()(const IpcId& k) const {
return hash<int>()(static_cast<int>(k));
}
};
}
struct IpcMessage {
IpcMessage(IpcId ipc_id) : ipc_id(ipc_id) {}
virtual ~IpcMessage() {}
const IpcId ipc_id;
virtual void Serialize(Writer& writer) = 0;
virtual void Deserialize(Reader& reader) = 0;
};
struct IpcRegistry {
using Allocator = std::function<IpcMessage*()>;
// Use unique_ptrs so we can initialize on first use
// (static init order might not be right).
std::unique_ptr<std::unordered_map<IpcId, Allocator>> allocators_;
template<typename T>
void Register(IpcId id);
std::unique_ptr<IpcMessage> Allocate(IpcId id);
static IpcRegistry* instance() {
if (!instance_)
instance_ = new IpcRegistry();
return instance_;
}
static IpcRegistry* instance_;
};
template<typename T>
void IpcRegistry::Register(IpcId id) {
if (!allocators_)
allocators_ = MakeUnique<std::unordered_map<IpcId, Allocator>>();
assert(allocators_->find(id) == allocators_->end() &&
"There is already an IPC message with the given id");
(*allocators_)[id] = [id]() {
return new T();
};
}
struct IpcDirectionalChannel {
// NOTE: We keep all pointers in terms of char* so pointer arithmetic is
// always relative to bytes.
explicit IpcDirectionalChannel(const std::string& name, bool initialize_shared_memory);
~IpcDirectionalChannel();
void PushMessage(IpcMessage* message);
std::vector<std::unique_ptr<IpcMessage>> TakeMessages();
struct MessageBuffer;
struct ResizableBuffer;
ResizableBuffer* CreateOrFindResizableBuffer(int id);
void RemoveResizableBuffer(int id);
std::unordered_map<int, std::unique_ptr<ResizableBuffer>> resizable_buffers;
// Pointer to process shared memory and process shared mutex.
std::unique_ptr<PlatformSharedMemory> shared;
std::unique_ptr<PlatformMutex> mutex;
// Pointer to process-local memory.
std::unique_ptr<char> local;
std::unique_ptr<MessageBuffer> shared_buffer;
std::unique_ptr<MessageBuffer> local_buffer;
};
struct IpcServer {
IpcServer(const std::string& name, int num_clients);
void SendToClient(int client_id, IpcMessage* message);
std::vector<std::unique_ptr<IpcMessage>> TakeMessages();
int num_clients() const { return clients_.size(); }
private:
IpcDirectionalChannel server_; // Local / us.
std::vector<std::unique_ptr<IpcDirectionalChannel>> clients_;
};
struct IpcClient {
IpcClient(const std::string& name, int client_id);
void SendToServer(IpcMessage* message);
std::vector<std::unique_ptr<IpcMessage>> TakeMessages();
IpcDirectionalChannel* client() { return &client_; }
private:
IpcDirectionalChannel server_;
IpcDirectionalChannel client_;
};

View File

@ -1,6 +1,7 @@
#pragma once
#include <iostream>
#include <sstream>
#include <unordered_map>
#include <unordered_set>
@ -28,6 +29,12 @@ enum class lsMethodId : int {
TextDocumentCodeLens,
CodeLensResolve,
WorkspaceSymbol,
// Internal implementation detail.
Quit,
IsAlive,
OpenProject,
Cout
};
MAKE_ENUM_HASHABLE(lsMethodId);
@ -140,14 +147,16 @@ struct MessageRegistry {
static MessageRegistry* instance_;
static MessageRegistry* instance();
using Allocator = std::function<std::unique_ptr<InMessage>(optional<RequestId> id, Reader& params)>;
using Allocator = std::function<std::unique_ptr<InMessage>(Reader& visitor)>;
std::unordered_map<std::string, Allocator> allocators;
template<typename T>
void Register() {
std::string method_name = MethodIdToString(T::kMethod);
allocators[method_name] = [](optional<RequestId> id, Reader& params) {
return MakeUnique<T>(id, params);
allocators[method_name] = [](Reader& visitor) {
auto result = MakeUnique<T>();
Reflect(visitor, *result);
return result;
};
}
@ -156,9 +165,6 @@ struct MessageRegistry {
if (jsonrpc != "2.0")
exit(1);
optional<RequestId> id;
ReflectMember(visitor, "id", id);
std::string method;
ReflectMember(visitor, "method", method);
@ -168,20 +174,7 @@ struct MessageRegistry {
}
Allocator& allocator = allocators[method];
// We run the allocator with actual params object or a null
// params object if there are no params. Unifying the two ifs is
// tricky because the second allocator param is a reference.
if (visitor.FindMember("params") != visitor.MemberEnd()) {
Reader& params = visitor["params"];
return allocator(id, params);
}
else {
Reader params;
params.SetNull();
return allocator(id, params);
}
return allocator(visitor);
}
};
@ -197,8 +190,6 @@ struct lsBaseMessage {};
struct InMessage : public lsBaseMessage {
const lsMethodId method_id;
optional<RequestId> id;
InMessage(lsMethodId method_id) : method_id(method_id) {}
};
@ -216,7 +207,7 @@ struct OutMessage : public lsBaseMessage {
virtual void WriteMessageBody(Writer& writer) = 0;
// Send the message to the language client by writing it to stdout.
void Send() {
void Send(std::ostream& out) {
rapidjson::StringBuffer output;
Writer writer(output);
writer.StartObject();
@ -225,10 +216,10 @@ struct OutMessage : public lsBaseMessage {
WriteMessageBody(writer);
writer.EndObject();
std::cout << "Content-Length: " << output.GetSize();
std::cout << (char)13 << char(10) << char(13) << char(10);
std::cout << output.GetString();
std::cout.flush();
out << "Content-Length: " << output.GetSize();
out << (char)13 << char(10) << char(13) << char(10);
out << output.GetString();
out.flush();
}
};
@ -346,6 +337,8 @@ struct OutNotificationMessage : public OutMessage {
struct In_CancelRequest : public InNotificationMessage {
static const lsMethodId kMethod = lsMethodId::CancelRequest;
RequestId id;
In_CancelRequest() : InNotificationMessage(kMethod) {}
};
@ -1282,6 +1275,8 @@ void Reflect(TVisitor& visitor, lsInitializeResult& value) {
struct In_InitializeRequest : public InRequestMessage {
const static lsMethodId kMethod = lsMethodId::Initialize;
RequestId id;
lsInitializeParams params;
In_InitializeRequest() : InRequestMessage(kMethod) {}
@ -1306,6 +1301,8 @@ struct Out_InitializeResponse : public OutResponseMessage {
struct In_InitializedNotification : public InNotificationMessage {
const static lsMethodId kMethod = lsMethodId::Initialized;
RequestId id;
In_InitializedNotification() : InNotificationMessage(kMethod) {}
};
@ -1375,6 +1372,7 @@ void Reflect(TVisitor& visitor, lsDocumentSymbolParams& value) {
struct In_DocumentSymbolRequest : public InRequestMessage {
const static lsMethodId kMethod = lsMethodId::TextDocumentDocumentSymbol;
RequestId id;
lsDocumentSymbolParams params;
In_DocumentSymbolRequest() : InRequestMessage(kMethod) {}
@ -1448,6 +1446,7 @@ using TCodeLens = lsCodeLens<lsCodeLensUserData, lsCodeLensCommandArguments>;
struct In_DocumentCodeLensRequest : public InRequestMessage {
const static lsMethodId kMethod = lsMethodId::TextDocumentCodeLens;
RequestId id;
lsDocumentCodeLensParams params;
In_DocumentCodeLensRequest() : InRequestMessage(kMethod) {}
@ -1473,6 +1472,7 @@ struct Out_DocumentCodeLensResponse : public OutResponseMessage {
struct In_DocumentCodeLensResolveRequest : public InRequestMessage {
const static lsMethodId kMethod = lsMethodId::CodeLensResolve;
RequestId id;
TCodeLens params;
In_DocumentCodeLensResolveRequest() : InRequestMessage(kMethod) {}
@ -1516,6 +1516,7 @@ void Reflect(TVisitor& visitor, lsWorkspaceSymbolParams& value) {
struct In_WorkspaceSymbolRequest : public InRequestMessage {
const static lsMethodId kMethod = lsMethodId::WorkspaceSymbol;
RequestId id;
lsWorkspaceSymbolParams params;
In_WorkspaceSymbolRequest() : InRequestMessage(kMethod) {}

View File

@ -4,10 +4,11 @@
#include <fstream>
#include <sstream>
#include <cassert>
#include <iostream>
namespace clang {
/*
/*
TranslationUnit::TranslationUnit(Index &index, const std::string &file_path,
const std::vector<std::string> &command_line_args,
const std::string &buffer, unsigned flags) {
@ -34,12 +35,27 @@ TranslationUnit::TranslationUnit(Index &index, const std::string &file_path,
std::vector<const char*> args;
for (const std::string& a : command_line_args) {
//if (a.size() >= 2 && a[0] == '-' && a[1] == 'D')
args.push_back(a.c_str());
args.push_back(a.c_str());
}
CXErrorCode error_code = clang_parseTranslationUnit2(
index.cx_index, file_path.c_str(), args.data(), args.size(), nullptr, 0, flags, &cx_tu);
assert(!error_code);
switch (error_code) {
case CXError_Success:
break;
case CXError_Failure:
std::cerr << "libclang generic failure for " << file_path << std::endl;
break;
case CXError_Crashed:
std::cerr << "libclang crashed for " << file_path << std::endl;
break;
case CXError_InvalidArguments:
std::cerr << "libclang had invalid arguments for " << file_path << std::endl;
break;
case CXError_ASTReadError:
std::cerr << "libclang had ast read error for " << file_path << std::endl;
break;
}
}
TranslationUnit::~TranslationUnit() {

175
old.cc
View File

@ -1,175 +0,0 @@
#if false
/*
// Connects to a running --project-directory instance. Forks
// and creates it if not running.
//
// Implements language server spec.
indexer.exe --language-server
// Holds the runtime db that the --language-server instance
// runs queries against.
indexer.exe --project-directory /work2/chrome/src
// Created from the --project-directory (server) instance
indexer.exe --index-file /work2/chrome/src/chrome/foo.cc
// Configuration data is read from a JSON file.
{
"max_threads": 40,
"cache_directory": "/work/indexer_cache/"
}
*/
bool ParsePreferredSymbolLocation(const std::string& content, PreferredSymbolLocation* obj) {
#define PARSE_AS(name, string) \
if (content == #string) { \
*obj = name; \
return true; \
}
PARSE_AS(PreferredSymbolLocation::Declaration, "declaration");
PARSE_AS(PreferredSymbolLocation::Definition, "definition");
return false;
#undef PARSE_AS
}
bool ParseCommand(const std::string& content, Command* obj) {
#define PARSE_AS(name, string) \
if (content == #string) { \
*obj = name; \
return true; \
}
PARSE_AS(Command::Callees, "callees");
PARSE_AS(Command::Callers, "callers");
PARSE_AS(Command::FindAllUsages, "find-all-usages");
PARSE_AS(Command::FindInterestingUsages, "find-interesting-usages");
PARSE_AS(Command::GotoReferenced, "goto-referenced");
PARSE_AS(Command::Hierarchy, "hierarchy");
PARSE_AS(Command::Outline, "outline");
PARSE_AS(Command::Search, "search");
return false;
#undef PARSE_AS
}
int main(int argc, char** argv) {
if (argc == 1 || options.find("--help") != options.end()) {
std::cout << R"help(clang-indexer help:
General:
--help Print this help information.
--help-commands
Print all available query commands.
--project Path to compile_commands.json. Needed for the server, and
optionally by clients if there are multiple servers running.
--print-config
Emit all configuration data this executable is using.
Server:
--server If present, this binary will run in server mode. The binary
will not return until killed or an exit is requested. The
server computes and caches an index of the entire program
which is then queried by short-lived client processes. A
client is created by running this binary with a --command
flag.
--cache-dir Directory to cache the index and other useful information. If
a previous cache is present, the database will try to reuse
it. If this flag is not present, the database will be
in-memory only.
--threads Number of threads to use for indexing and querying tasks.
This value is optional; a good estimate is computed by
default.
Client:
--command Execute a query command against the index. See
--command-help for a listing of valid commands and a
description of what they do. Presence of this flag indicates
that the indexer is in client mode; this flag is mutually
exclusive with --server.
--location Location of the query. Some commands require only a file,
other require a line and column as well. Format is
filename[:line:column]. For example, "foobar.cc" and
"foobar.cc:1:10" are valid inputs.
--preferred-symbol-location
When looking up symbols, try to return either the
'declaration' or the 'definition'. Defaults to 'definition'.
)help";
exit(0);
}
if (HasOption(options, "--help-commands")) {
std::cout << R"(Available commands:
callees:
callers:
Emit all functions (with location) that this function calls ("callees") or
that call this function ("callers"). Requires a location.
find-all-usages:
Emit every usage of the given symbol. This is intended to support a rename
refactoring. This output contains many uninteresting usages of symbols;
prefer find-interesting-usges. Requires a location.
find-interesting-usages:
Emit only usages of the given symbol which are semantically interesting.
Requires a location.
goto-referenced:
Find an associated reference (either definition or declaration) for the
given symbol. Requires a location.
hierarchy:
List the type hierarchy (ie, inherited and derived members) for the given
method or type. Requires a location.
outline:
Emit a file outline, listing all of the symbols in the file.
search:
Search for a symbol by name.
)";
exit(0);
}
if (HasOption(options, "--project")) {
std::vector<CompilationEntry> entries = LoadCompilationEntriesFromDirectory(options["--project"]);
for (const CompilationEntry& entry : entries) {
std::cout << "Parsing " << entry.filename << std::endl;
QueryableDatabase db;
IndexedFile file = Parse(entry.filename, entry.args);
IndexUpdate update(file);
db.ApplyIndexUpdate(&update);
//std::cout << db.ToString() << std::endl << std::endl;
}
std::cin.get();
exit(0);
}
if (HasOption(options, "--command")) {
Command command;
if (!ParseCommand(options["--command"], &command))
Fail("Unknown command \"" + options["--command"] + "\"; see --help-commands");
}
std::cout << "Invalid arguments. Try --help.";
exit(1);
return 0;
}
#endif

View File

@ -343,12 +343,18 @@ void CompareGroups(
IndexUpdate::IndexUpdate(IndexedFile& file) {
files_added.push_back(QueryableFile(file));
for (const IndexedTypeDef& def : file.types)
for (const IndexedTypeDef& def : file.types) {
if (def.is_bad_def) continue;
types_added.push_back(QueryableTypeDef(file.id_cache, def));
for (const IndexedFuncDef& def : file.funcs)
}
for (const IndexedFuncDef& def : file.funcs) {
if (def.is_bad_def) continue;
funcs_added.push_back(QueryableFuncDef(file.id_cache, def));
for (const IndexedVarDef& def : file.vars)
}
for (const IndexedVarDef& def : file.vars) {
if (def.is_bad_def) continue;
vars_added.push_back(QueryableVarDef(file.id_cache, def));
}
}
IndexUpdate::IndexUpdate(IndexedFile& previous_file, IndexedFile& current_file) {

51
src/threaded_queue.h Normal file
View File

@ -0,0 +1,51 @@
#pragma once
// TODO: cleanup includes.
#include <algorithm>
#include <queue>
#include <mutex>
#include <condition_variable>
#include "../optional.h"
// A threadsafe-queue. http://stackoverflow.com/a/16075550
template <class T>
class ThreadedQueue {
public:
// Add an element to the queue.
void Enqueue(T t) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(t);
cv_.notify_one();
}
// Get the "front"-element.
// If the queue is empty, wait till a element is avaiable.
T Dequeue() {
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty()) {
// release lock as long as the wait and reaquire it afterwards.
cv_.wait(lock);
}
T val = queue_.front();
queue_.pop();
return val;
}
// Get the "front"-element.
// Returns empty if the queue is empty.
optional<T> TryDequeue() {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty())
return nullopt;
T val = queue_.front();
queue_.pop();
return val;
}
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cv_;
};

14
src/timer.cc Normal file
View File

@ -0,0 +1,14 @@
#include "timer.h"
Timer::Timer() {
Reset();
}
void Timer::Reset() {
start = Clock::now();
}
long long Timer::ElapsedMilliseconds() {
std::chrono::time_point<Clock> end = Clock::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
}

19
src/timer.h Normal file
View File

@ -0,0 +1,19 @@
#pragma once
#include <chrono>
struct Timer {
using Clock = std::chrono::high_resolution_clock;
// Creates a new timer. A timer is always running.
Timer();
// Restart/reset the timer.
void Reset();
// Return the number of milliseconds since the timer was last reset.
long long ElapsedMilliseconds();
// Raw start time.
std::chrono::time_point<Clock> start;
};

View File

@ -35,17 +35,16 @@ struct TypedBidiMessageQueue {
deserializers_[id] = deserializer;
}
void SendMessage(MessageQueue* destination, TId id, TMessage* message) {
void SendMessage(MessageQueue* destination, TId id, TMessage& message) {
// Create writer.
rapidjson::StringBuffer output;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(output);
writer.SetIndent(' ', 0);
// Serialize the message.
assert(serializers_.find(message->id) != serializers_.end() &&
"No registered serializer");
assert(serializers_.find(id) != serializers_.end() && "No registered serializer");
const Serializer& serializer = serializers_.find(id)->second;
serializer(writer, *message);
serializer(writer, message);
// Send message.
void* payload = malloc(sizeof(MessageHeader) + output.GetSize());

224
task.cc
View File

@ -1,224 +0,0 @@
#include <cassert>
#include <condition_variable>
#include <iostream>
#include <thread>
#include <vector>
#include "compilation_database_loader.h"
#include "indexer.h"
#include "query.h"
#include "optional.h"
#include "utils.h"
//#include "third_party/tiny-process-library/process.hpp"
#include <algorithm>
#include <queue>
#include <mutex>
#include <condition_variable>
using std::experimental::optional;
using std::experimental::nullopt;
// A threadsafe-queue. http://stackoverflow.com/a/16075550
template <class T>
class SafeQueue {
public:
// Add an element to the queue.
void enqueue(T t) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(t);
cv_.notify_one();
}
// Get the "front"-element.
// If the queue is empty, wait till a element is avaiable.
T dequeue() {
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty()) {
// release lock as long as the wait and reaquire it afterwards.
cv_.wait(lock);
}
T val = queue_.front();
queue_.pop();
return val;
}
// Get the "front"-element.
// Returns empty if the queue is empty.
optional<T> try_dequeue() {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty())
return nullopt;
T val = queue_.front();
queue_.pop();
return val;
}
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cv_;
};
struct Task {
int priority = 0;
bool writes_to_index = false;
enum class Kind {
CreateIndex,
IndexImport,
Exit
};
Kind kind;
struct CreateIndexState {
CompilationEntry data;
};
struct IndexImportState {
std::string path;
};
struct ExitState {};
// TODO: Move into a union?
CreateIndexState create_index;
IndexImportState index_import;
ExitState exit;
static Task MakeExit() {
Task task;
task.kind = Kind::Exit;
return task;
}
static Task MakeCreateIndexTask(CompilationEntry compilation_entry) {
Task task;
task.kind = Kind::CreateIndex;
task.create_index.data = compilation_entry;
return task;
}
static Task MakeIndexImportTask(std::string filename) {
Task task;
task.kind = Kind::IndexImport;
task.index_import.path = filename;
return task;
}
// TODO: Create index task.
// Task running in a separate process, parsing a file into something we can
// import.
// TODO: Index import task.
// Completed parse task that wants to import content into the global database.
// Runs in main process, primary thread. Stops all other threads.
// TODO: Index fresh task.
// Completed parse task that wants to update content previously imported into
// the global database. Runs in main process, primary thread. Stops all other
// threads.
//
// Note that this task just contains a set of operations to apply to the global
// database. The operations come from a diff based on the previously indexed
// state in comparison to the newly indexed state.
//
// TODO: We may be able to run multiple freshen and import tasks in parallel if
// we restrict what ranges of the db they may change.
// TODO: QueryTask
// Task running a query against the global database. Run in main process,
// separate thread.
//Command query;
//Location location;
//std::string argument;
};
struct Config {
// Cache directory. Always ends with /
std::string cache_directory;
};
// NOTE: When something enters a value into master db, it will have to have a
// ref count, since multiple parsings could enter it (unless we require
// that it be defined in that declaration unit!)
struct TaskManager {
SafeQueue<Task> queued_tasks;
// Available threads.
std::vector<std::thread> threads;
TaskManager(int num_threads, Config* config);
};
void PostTaskToIndexer(TaskManager* tm, Task task) {
tm->queued_tasks.enqueue(task);
}
void RunIndexTask(Config* config, TaskManager* tm, CompilationEntry entry) {
IndexedFile file = Parse(entry.filename, entry.args);
std::string cleaned_file_path = entry.directory + "/" + entry.filename;
std::replace(cleaned_file_path.begin(), cleaned_file_path.end(), '/', '_');
std::replace(cleaned_file_path.begin(), cleaned_file_path.end(), '\\', '_');
std::string filename = config->cache_directory + cleaned_file_path;
WriteToFile(filename, file.ToString());
PostTaskToIndexer(tm, Task::MakeIndexImportTask(filename));
}
void LoadProject(Config* config, TaskManager* tm, std::vector<CompilationEntry> entries) {
for (CompilationEntry entry : entries) {
tm->queued_tasks.enqueue(Task::MakeCreateIndexTask(entry));
}
}
static void ThreadMain(int id, Config* config, TaskManager* tm) {
while (true) {
Task task = tm->queued_tasks.dequeue();
switch (task.kind) {
case Task::Kind::CreateIndex:
RunIndexTask(config, tm, task.create_index.data);
break;
case Task::Kind::IndexImport:
assert(false);
break;
case Task::Kind::Exit:
std::cerr << id << ": Exiting" << std::endl;
return;
}
std::cerr << id << ": waking" << std::endl;
}
}
TaskManager::TaskManager(int num_threads, Config* config) {
for (int i = 0; i < num_threads; ++i) {
threads.push_back(std::thread(&ThreadMain, i, config, this));
}
}
void Pump(TaskManager* tm) {
//tm->threads[0].
}
int main252525225(int argc, char** argv) {
Config config;
TaskManager tm(5, &config);
LoadProject(&config, &tm, LoadCompilationEntriesFromDirectory("full_tests/simple_cross_reference"));
// TODO: looks like we will have to write shared memory support.
// TODO: We signal thread to pick data, thread signals data pick is done.
// Repeat until we encounter a writer, wait for all threads to signal
// they are done.
// TODO: Let's use a thread safe queue/vector/etc instead.
//for (int i = 0; i < 10; ++i)
// tm.queued_tasks.enqueue(Task::MakeExit());
for (std::thread& thread : tm.threads)
thread.join();
std::cin.get();
return 0;
}