0fd97077227da981312e7750e7155dac954483dc950c9dc9d3a9117ac47cd9f629bfda626717b3d11bd9e225fa4236dac757864776d291e5b6f554f5ae0db2 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. #include <string>
  2. #include <fstream>
  3. #include <stdlib.h>
  4. #include <algorithm>
  5. #include "../DirTree.hh"
  6. #include "../Event.hh"
  7. #include "./BSER.hh"
  8. #include "./WatchmanBackend.hh"
  9. #ifdef _WIN32
  10. #include "../windows/win_utils.hh"
  11. #define S_ISDIR(mode) ((mode & _S_IFDIR) == _S_IFDIR)
  12. #define popen _popen
  13. #define pclose _pclose
  14. #else
  15. #include <sys/stat.h>
  16. #define normalizePath(dir) dir
  17. #endif
  18. template<typename T>
  19. BSER readBSER(T &&do_read) {
  20. std::stringstream oss;
  21. char buffer[256];
  22. int r;
  23. int64_t len = -1;
  24. do {
  25. // Start by reading a minimal amount of data in order to decode the length.
  26. // After that, attempt to read the remaining length, up to the buffer size.
  27. r = do_read(buffer, len == -1 ? 20 : (len < 256 ? len : 256));
  28. oss << std::string(buffer, r);
  29. if (len == -1) {
  30. uint64_t l = BSER::decodeLength(oss);
  31. len = l + oss.tellg();
  32. }
  33. len -= r;
  34. } while (len > 0);
  35. return BSER(oss);
  36. }
  37. std::string getSockPath() {
  38. auto var = getenv("WATCHMAN_SOCK");
  39. if (var && *var) {
  40. return std::string(var);
  41. }
  42. FILE *fp = popen("watchman --output-encoding=bser get-sockname", "r");
  43. if (fp == NULL || errno == ECHILD) {
  44. throw std::runtime_error("Failed to execute watchman");
  45. }
  46. BSER b = readBSER([fp] (char *buf, size_t len) {
  47. return fread(buf, sizeof(char), len, fp);
  48. });
  49. pclose(fp);
  50. auto objValue = b.objectValue();
  51. auto foundSockname = objValue.find("sockname");
  52. if (foundSockname == objValue.end()) {
  53. throw std::runtime_error("sockname not found");
  54. }
  55. return foundSockname->second.stringValue();
  56. }
  57. std::unique_ptr<IPC> watchmanConnect() {
  58. std::string path = getSockPath();
  59. return std::unique_ptr<IPC>(new IPC(path));
  60. }
  61. BSER watchmanRead(IPC *ipc) {
  62. return readBSER([ipc] (char *buf, size_t len) {
  63. return ipc->read(buf, len);
  64. });
  65. }
  66. BSER::Object WatchmanBackend::watchmanRequest(BSER b) {
  67. std::string cmd = b.encode();
  68. mIPC->write(cmd);
  69. mRequestSignal.notify();
  70. mResponseSignal.wait();
  71. mResponseSignal.reset();
  72. if (!mError.empty()) {
  73. std::runtime_error err = std::runtime_error(mError);
  74. mError = std::string();
  75. throw err;
  76. }
  77. return mResponse;
  78. }
  79. void WatchmanBackend::watchmanWatch(std::string dir) {
  80. std::vector<BSER> cmd;
  81. cmd.push_back("watch");
  82. cmd.push_back(normalizePath(dir));
  83. watchmanRequest(cmd);
  84. }
  85. bool WatchmanBackend::checkAvailable() {
  86. try {
  87. watchmanConnect();
  88. return true;
  89. } catch (std::exception &err) {
  90. return false;
  91. }
  92. }
  93. void handleFiles(WatcherRef watcher, BSER::Object obj) {
  94. auto found = obj.find("files");
  95. if (found == obj.end()) {
  96. throw WatcherError("Error reading changes from watchman", watcher);
  97. }
  98. auto files = found->second.arrayValue();
  99. for (auto it = files.begin(); it != files.end(); it++) {
  100. auto file = it->objectValue();
  101. auto name = file.find("name")->second.stringValue();
  102. #ifdef _WIN32
  103. std::replace(name.begin(), name.end(), '/', '\\');
  104. #endif
  105. auto mode = file.find("mode")->second.intValue();
  106. auto isNew = file.find("new")->second.boolValue();
  107. auto exists = file.find("exists")->second.boolValue();
  108. auto path = watcher->mDir + DIR_SEP + name;
  109. if (watcher->isIgnored(path)) {
  110. continue;
  111. }
  112. if (isNew && exists) {
  113. watcher->mEvents.create(path);
  114. } else if (exists && !S_ISDIR(mode)) {
  115. watcher->mEvents.update(path);
  116. } else if (!isNew && !exists) {
  117. watcher->mEvents.remove(path);
  118. }
  119. }
  120. }
  121. void WatchmanBackend::handleSubscription(BSER::Object obj) {
  122. std::unique_lock<std::mutex> lock(mMutex);
  123. auto subscription = obj.find("subscription")->second.stringValue();
  124. auto it = mSubscriptions.find(subscription);
  125. if (it == mSubscriptions.end()) {
  126. return;
  127. }
  128. auto watcher = it->second;
  129. try {
  130. handleFiles(watcher, obj);
  131. watcher->notify();
  132. } catch (WatcherError &err) {
  133. handleWatcherError(err);
  134. }
  135. }
  136. void WatchmanBackend::start() {
  137. mIPC = watchmanConnect();
  138. notifyStarted();
  139. while (true) {
  140. // If there are no subscriptions we are reading, wait for a request.
  141. if (mSubscriptions.size() == 0) {
  142. mRequestSignal.wait();
  143. mRequestSignal.reset();
  144. }
  145. // Break out of loop if we are stopped.
  146. if (mStopped) {
  147. break;
  148. }
  149. // Attempt to read from the socket.
  150. // If there is an error and we are stopped, break.
  151. BSER b;
  152. try {
  153. b = watchmanRead(&*mIPC);
  154. } catch (std::exception &err) {
  155. if (mStopped) {
  156. break;
  157. } else if (mResponseSignal.isWaiting()) {
  158. mError = err.what();
  159. mResponseSignal.notify();
  160. } else {
  161. // Throwing causes the backend to be destroyed, but we never reach the code below to notify the signal
  162. mEndedSignal.notify();
  163. throw;
  164. }
  165. }
  166. auto obj = b.objectValue();
  167. auto error = obj.find("error");
  168. if (error != obj.end()) {
  169. mError = error->second.stringValue();
  170. mResponseSignal.notify();
  171. continue;
  172. }
  173. // If this message is for a subscription, handle it, otherwise notify the request.
  174. auto subscription = obj.find("subscription");
  175. if (subscription != obj.end()) {
  176. handleSubscription(obj);
  177. } else {
  178. mResponse = obj;
  179. mResponseSignal.notify();
  180. }
  181. }
  182. mEndedSignal.notify();
  183. }
  184. WatchmanBackend::~WatchmanBackend() {
  185. // Mark the watcher as stopped, close the socket, and trigger the lock.
  186. // This will cause the read loop to be broken and the thread to exit.
  187. mStopped = true;
  188. mIPC.reset();
  189. mRequestSignal.notify();
  190. // If not ended yet, wait.
  191. mEndedSignal.wait();
  192. }
  193. std::string WatchmanBackend::clock(WatcherRef watcher) {
  194. BSER::Array cmd;
  195. cmd.push_back("clock");
  196. cmd.push_back(normalizePath(watcher->mDir));
  197. BSER::Object obj = watchmanRequest(cmd);
  198. auto found = obj.find("clock");
  199. if (found == obj.end()) {
  200. throw WatcherError("Error reading clock from watchman", watcher);
  201. }
  202. return found->second.stringValue();
  203. }
  204. void WatchmanBackend::writeSnapshot(WatcherRef watcher, std::string *snapshotPath) {
  205. std::unique_lock<std::mutex> lock(mMutex);
  206. watchmanWatch(watcher->mDir);
  207. std::ofstream ofs(*snapshotPath);
  208. ofs << clock(watcher);
  209. }
  210. void WatchmanBackend::getEventsSince(WatcherRef watcher, std::string *snapshotPath) {
  211. std::unique_lock<std::mutex> lock(mMutex);
  212. std::ifstream ifs(*snapshotPath);
  213. if (ifs.fail()) {
  214. return;
  215. }
  216. watchmanWatch(watcher->mDir);
  217. std::string clock;
  218. ifs >> clock;
  219. BSER::Array cmd;
  220. cmd.push_back("since");
  221. cmd.push_back(normalizePath(watcher->mDir));
  222. cmd.push_back(clock);
  223. BSER::Object obj = watchmanRequest(cmd);
  224. handleFiles(watcher, obj);
  225. }
  226. std::string getId(WatcherRef watcher) {
  227. std::ostringstream id;
  228. id << "parcel-";
  229. id << static_cast<void*>(watcher.get());
  230. return id.str();
  231. }
  232. // This function is called by Backend::watch which takes a lock on mMutex
  233. void WatchmanBackend::subscribe(WatcherRef watcher) {
  234. watchmanWatch(watcher->mDir);
  235. std::string id = getId(watcher);
  236. BSER::Array cmd;
  237. cmd.push_back("subscribe");
  238. cmd.push_back(normalizePath(watcher->mDir));
  239. cmd.push_back(id);
  240. BSER::Array fields;
  241. fields.push_back("name");
  242. fields.push_back("mode");
  243. fields.push_back("exists");
  244. fields.push_back("new");
  245. BSER::Object opts;
  246. opts.emplace("fields", fields);
  247. opts.emplace("since", clock(watcher));
  248. if (watcher->mIgnorePaths.size() > 0) {
  249. BSER::Array ignore;
  250. BSER::Array anyOf;
  251. anyOf.push_back("anyof");
  252. for (auto it = watcher->mIgnorePaths.begin(); it != watcher->mIgnorePaths.end(); it++) {
  253. std::string pathStart = watcher->mDir + DIR_SEP;
  254. if (it->rfind(pathStart, 0) == 0) {
  255. auto relative = it->substr(pathStart.size());
  256. BSER::Array dirname;
  257. dirname.push_back("dirname");
  258. dirname.push_back(relative);
  259. anyOf.push_back(dirname);
  260. }
  261. }
  262. ignore.push_back("not");
  263. ignore.push_back(anyOf);
  264. opts.emplace("expression", ignore);
  265. }
  266. cmd.push_back(opts);
  267. watchmanRequest(cmd);
  268. mSubscriptions.emplace(id, watcher);
  269. mRequestSignal.notify();
  270. }
  271. // This function is called by Backend::unwatch which takes a lock on mMutex
  272. void WatchmanBackend::unsubscribe(WatcherRef watcher) {
  273. std::string id = getId(watcher);
  274. auto erased = mSubscriptions.erase(id);
  275. if (erased) {
  276. BSER::Array cmd;
  277. cmd.push_back("unsubscribe");
  278. cmd.push_back(normalizePath(watcher->mDir));
  279. cmd.push_back(id);
  280. watchmanRequest(cmd);
  281. }
  282. }