| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- #include <string>
- #include <fstream>
- #include <stdlib.h>
- #include <algorithm>
- #include "../DirTree.hh"
- #include "../Event.hh"
- #include "./BSER.hh"
- #include "./WatchmanBackend.hh"
- #ifdef _WIN32
- #include "../windows/win_utils.hh"
- #define S_ISDIR(mode) ((mode & _S_IFDIR) == _S_IFDIR)
- #define popen _popen
- #define pclose _pclose
- #else
- #include <sys/stat.h>
- #define normalizePath(dir) dir
- #endif
- template<typename T>
- BSER readBSER(T &&do_read) {
- std::stringstream oss;
- char buffer[256];
- int r;
- int64_t len = -1;
- do {
- // Start by reading a minimal amount of data in order to decode the length.
- // After that, attempt to read the remaining length, up to the buffer size.
- r = do_read(buffer, len == -1 ? 20 : (len < 256 ? len : 256));
- oss << std::string(buffer, r);
- if (len == -1) {
- uint64_t l = BSER::decodeLength(oss);
- len = l + oss.tellg();
- }
- len -= r;
- } while (len > 0);
- return BSER(oss);
- }
- std::string getSockPath() {
- auto var = getenv("WATCHMAN_SOCK");
- if (var && *var) {
- return std::string(var);
- }
- FILE *fp = popen("watchman --output-encoding=bser get-sockname", "r");
- if (fp == NULL || errno == ECHILD) {
- throw std::runtime_error("Failed to execute watchman");
- }
- BSER b = readBSER([fp] (char *buf, size_t len) {
- return fread(buf, sizeof(char), len, fp);
- });
- pclose(fp);
- auto objValue = b.objectValue();
- auto foundSockname = objValue.find("sockname");
- if (foundSockname == objValue.end()) {
- throw std::runtime_error("sockname not found");
- }
- return foundSockname->second.stringValue();
- }
- std::unique_ptr<IPC> watchmanConnect() {
- std::string path = getSockPath();
- return std::unique_ptr<IPC>(new IPC(path));
- }
- BSER watchmanRead(IPC *ipc) {
- return readBSER([ipc] (char *buf, size_t len) {
- return ipc->read(buf, len);
- });
- }
- BSER::Object WatchmanBackend::watchmanRequest(BSER b) {
- std::string cmd = b.encode();
- mIPC->write(cmd);
- mRequestSignal.notify();
- mResponseSignal.wait();
- mResponseSignal.reset();
- if (!mError.empty()) {
- std::runtime_error err = std::runtime_error(mError);
- mError = std::string();
- throw err;
- }
- return mResponse;
- }
- void WatchmanBackend::watchmanWatch(std::string dir) {
- std::vector<BSER> cmd;
- cmd.push_back("watch");
- cmd.push_back(normalizePath(dir));
- watchmanRequest(cmd);
- }
- bool WatchmanBackend::checkAvailable() {
- try {
- watchmanConnect();
- return true;
- } catch (std::exception &err) {
- return false;
- }
- }
- void handleFiles(WatcherRef watcher, BSER::Object obj) {
- auto found = obj.find("files");
- if (found == obj.end()) {
- throw WatcherError("Error reading changes from watchman", watcher);
- }
- auto files = found->second.arrayValue();
- for (auto it = files.begin(); it != files.end(); it++) {
- auto file = it->objectValue();
- auto name = file.find("name")->second.stringValue();
- #ifdef _WIN32
- std::replace(name.begin(), name.end(), '/', '\\');
- #endif
- auto mode = file.find("mode")->second.intValue();
- auto isNew = file.find("new")->second.boolValue();
- auto exists = file.find("exists")->second.boolValue();
- auto path = watcher->mDir + DIR_SEP + name;
- if (watcher->isIgnored(path)) {
- continue;
- }
- if (isNew && exists) {
- watcher->mEvents.create(path);
- } else if (exists && !S_ISDIR(mode)) {
- watcher->mEvents.update(path);
- } else if (!isNew && !exists) {
- watcher->mEvents.remove(path);
- }
- }
- }
- void WatchmanBackend::handleSubscription(BSER::Object obj) {
- std::unique_lock<std::mutex> lock(mMutex);
- auto subscription = obj.find("subscription")->second.stringValue();
- auto it = mSubscriptions.find(subscription);
- if (it == mSubscriptions.end()) {
- return;
- }
- auto watcher = it->second;
- try {
- handleFiles(watcher, obj);
- watcher->notify();
- } catch (WatcherError &err) {
- handleWatcherError(err);
- }
- }
- void WatchmanBackend::start() {
- mIPC = watchmanConnect();
- notifyStarted();
- while (true) {
- // If there are no subscriptions we are reading, wait for a request.
- if (mSubscriptions.size() == 0) {
- mRequestSignal.wait();
- mRequestSignal.reset();
- }
- // Break out of loop if we are stopped.
- if (mStopped) {
- break;
- }
- // Attempt to read from the socket.
- // If there is an error and we are stopped, break.
- BSER b;
- try {
- b = watchmanRead(&*mIPC);
- } catch (std::exception &err) {
- if (mStopped) {
- break;
- } else if (mResponseSignal.isWaiting()) {
- mError = err.what();
- mResponseSignal.notify();
- } else {
- // Throwing causes the backend to be destroyed, but we never reach the code below to notify the signal
- mEndedSignal.notify();
- throw;
- }
- }
- auto obj = b.objectValue();
- auto error = obj.find("error");
- if (error != obj.end()) {
- mError = error->second.stringValue();
- mResponseSignal.notify();
- continue;
- }
- // If this message is for a subscription, handle it, otherwise notify the request.
- auto subscription = obj.find("subscription");
- if (subscription != obj.end()) {
- handleSubscription(obj);
- } else {
- mResponse = obj;
- mResponseSignal.notify();
- }
- }
- mEndedSignal.notify();
- }
- WatchmanBackend::~WatchmanBackend() {
- // Mark the watcher as stopped, close the socket, and trigger the lock.
- // This will cause the read loop to be broken and the thread to exit.
- mStopped = true;
- mIPC.reset();
- mRequestSignal.notify();
- // If not ended yet, wait.
- mEndedSignal.wait();
- }
- std::string WatchmanBackend::clock(WatcherRef watcher) {
- BSER::Array cmd;
- cmd.push_back("clock");
- cmd.push_back(normalizePath(watcher->mDir));
- BSER::Object obj = watchmanRequest(cmd);
- auto found = obj.find("clock");
- if (found == obj.end()) {
- throw WatcherError("Error reading clock from watchman", watcher);
- }
- return found->second.stringValue();
- }
- void WatchmanBackend::writeSnapshot(WatcherRef watcher, std::string *snapshotPath) {
- std::unique_lock<std::mutex> lock(mMutex);
- watchmanWatch(watcher->mDir);
- std::ofstream ofs(*snapshotPath);
- ofs << clock(watcher);
- }
- void WatchmanBackend::getEventsSince(WatcherRef watcher, std::string *snapshotPath) {
- std::unique_lock<std::mutex> lock(mMutex);
- std::ifstream ifs(*snapshotPath);
- if (ifs.fail()) {
- return;
- }
- watchmanWatch(watcher->mDir);
- std::string clock;
- ifs >> clock;
- BSER::Array cmd;
- cmd.push_back("since");
- cmd.push_back(normalizePath(watcher->mDir));
- cmd.push_back(clock);
- BSER::Object obj = watchmanRequest(cmd);
- handleFiles(watcher, obj);
- }
- std::string getId(WatcherRef watcher) {
- std::ostringstream id;
- id << "parcel-";
- id << static_cast<void*>(watcher.get());
- return id.str();
- }
- // This function is called by Backend::watch which takes a lock on mMutex
- void WatchmanBackend::subscribe(WatcherRef watcher) {
- watchmanWatch(watcher->mDir);
- std::string id = getId(watcher);
- BSER::Array cmd;
- cmd.push_back("subscribe");
- cmd.push_back(normalizePath(watcher->mDir));
- cmd.push_back(id);
- BSER::Array fields;
- fields.push_back("name");
- fields.push_back("mode");
- fields.push_back("exists");
- fields.push_back("new");
- BSER::Object opts;
- opts.emplace("fields", fields);
- opts.emplace("since", clock(watcher));
- if (watcher->mIgnorePaths.size() > 0) {
- BSER::Array ignore;
- BSER::Array anyOf;
- anyOf.push_back("anyof");
- for (auto it = watcher->mIgnorePaths.begin(); it != watcher->mIgnorePaths.end(); it++) {
- std::string pathStart = watcher->mDir + DIR_SEP;
- if (it->rfind(pathStart, 0) == 0) {
- auto relative = it->substr(pathStart.size());
- BSER::Array dirname;
- dirname.push_back("dirname");
- dirname.push_back(relative);
- anyOf.push_back(dirname);
- }
- }
- ignore.push_back("not");
- ignore.push_back(anyOf);
- opts.emplace("expression", ignore);
- }
- cmd.push_back(opts);
- watchmanRequest(cmd);
- mSubscriptions.emplace(id, watcher);
- mRequestSignal.notify();
- }
- // This function is called by Backend::unwatch which takes a lock on mMutex
- void WatchmanBackend::unsubscribe(WatcherRef watcher) {
- std::string id = getId(watcher);
- auto erased = mSubscriptions.erase(id);
- if (erased) {
- BSER::Array cmd;
- cmd.push_back("unsubscribe");
- cmd.push_back(normalizePath(watcher->mDir));
- cmd.push_back(id);
- watchmanRequest(cmd);
- }
- }
|