test_main.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781
  1. #!/usr/bin/env python3
  2. # -*- coding: UTF-8 -*-
  3. import base64
  4. import json
  5. import logging
  6. import os
  7. import sys
  8. import shutil
  9. import time
  10. import unittest
  11. import urllib
  12. from unittest import mock
  13. import requests
  14. from tornado import testing, web
  15. testdir = os.path.dirname(os.path.realpath(__file__))
  16. projdir = os.path.realpath(testdir + "/../../")
  17. sys.path.append(projdir)
  18. import webserver.handlers.base
  19. import webserver.handlers.book
  20. from webserver import loader, main, models, plugins # nosq: E402
  21. from webserver.handlers.base import BaseHandler
  22. _app = None
  23. _mock_user = None
  24. _mock_mail = None
  25. '''
  26. 1 EPUB 440912 Bai Nian Gu Du - Jia Xi Ya Ma Er Ke Si
  27. 2 TXT 298421 Man Man Zi You Lu - Unknown
  28. 3 MOBI 2662851 An Tu Sheng Tong Hua - An Tu Sheng
  29. 4 AZW3 344989 Mai Ken Xi Fang Fa (Jing Guan Tu Shu De Ch - Ai Sen _La Sai Er (Ethan M.Rasiel)
  30. 5 PDF 6127496 E Yu Pa Pa Ya Yi Pa Pa - Unknown
  31. 6 EPUB 324726 Tang Shi San Bai Shou - Wei Zhi
  32. '''
  33. BID_EPUB = 1
  34. BID_TXT = 2
  35. BID_MOBI = 3
  36. BID_AZW3 = 4
  37. BID_PDF = 5
  38. BIDS = list(range(1,6))
  39. def setup_server():
  40. global _app
  41. # copy new db
  42. shutil.copyfile(testdir+"/cases/users.db", testdir+"/library/users.db")
  43. shutil.copyfile(testdir+"/cases/metadata.db", testdir+"/library/metadata.db")
  44. # set env
  45. main.options.with_library = testdir + "/library/"
  46. main.CONF["scan_upload_path"] = testdir + "/cases/"
  47. main.CONF["ALLOW_GUEST_PUSH"] = False
  48. main.CONF["ALLOW_GUEST_DOWNLOAD"] = False
  49. main.CONF["upload_path"] = "/tmp/"
  50. main.CONF["html_path"] = "/tmp/"
  51. main.CONF["settings_path"] = "/tmp/"
  52. main.CONF["progress_path"] = "/tmp/"
  53. main.CONF["nuxt_env_path"] = "/tmp/.env.text"
  54. main.CONF["installed"] = True
  55. main.CONF["INVITE_MODE"] = False
  56. main.CONF["user_database"] = "sqlite:///%s/library/users.db" % testdir
  57. main.CONF["db_engine_args"] = {"echo": True}
  58. if _app is None:
  59. _app = main.make_app()
  60. def setup_mock_user():
  61. global _mock_user
  62. _mock_user = mock.patch.object(BaseHandler, "user_id", return_value=1)
  63. def setup_mock_sendmail():
  64. global _mock_mail
  65. _mock_mail = mock.patch("calibre.utils.smtp.sendmail", return_value="Yo")
  66. def get_db():
  67. return _app.settings["ScopedSession"]
  68. def Q(s):
  69. if not isinstance(s, str):
  70. s = str(s)
  71. return urllib.parse.quote(s.encode("UTF-8"))
  72. class FakeHandler(BaseHandler):
  73. def __init__(h):
  74. h.request = h
  75. h.request.headers = {}
  76. h.request.remote_ip = "1.2.3.4"
  77. h.rsp_headers = {}
  78. h.rsp = None
  79. h.cookie = {}
  80. h.session = get_db()
  81. def write(self, rsp):
  82. self.rsp = rsp
  83. def finish(self):
  84. return None
  85. def set_header(self, k, v):
  86. self.rsp_headers[k] = v
  87. def set_secure_cookie(self, k, v):
  88. self.cookie[k] = v
  89. class TestApp(testing.AsyncHTTPTestCase):
  90. def get_app(self):
  91. return _app
  92. def json(self, url, *args, **kwargs):
  93. if 'request_timeout' not in kwargs:
  94. kwargs['request_timeout'] = 60
  95. rsp = self.fetch(url, *args, **kwargs)
  96. self.assertEqual(rsp.code, 200)
  97. return json.loads(rsp.body)
  98. def gt(self, n, at_least):
  99. self.assertEqual(n, max(n, at_least))
  100. def assert_book_simple(self, book):
  101. self.assertTrue("img" in book)
  102. self.assertTrue("authors" in book)
  103. self.assertTrue("comments" in book)
  104. self.assertTrue("id" in book)
  105. self.assertTrue("publisher" in book)
  106. self.assertTrue("rating" in book)
  107. self.assertTrue("title" in book)
  108. def assert_book_fields(self, book):
  109. self.assert_book_simple(book)
  110. self.assertTrue("tags" in book)
  111. self.assertTrue("timestamp" in book)
  112. self.assertTrue("collector" in book)
  113. self.assertTrue("count_visit" in book)
  114. self.assertTrue("count_download" in book)
  115. self.assertTrue("pubdate" in book)
  116. self.assertTrue("series" in book)
  117. def assert_book(self, book):
  118. self.assert_book_fields(book)
  119. d = self.json("/api/book/%s" % book["id"])["book"]
  120. self.assert_book_fields(d)
  121. self.gt(len(d["files"]), 1)
  122. def assert_book_list(self, d, at_least):
  123. self.assertEqual(d["err"], "ok")
  124. self.gt(d["total"], at_least)
  125. n = len(d["books"])
  126. if n < 30:
  127. self.assertEqual(d["total"], n)
  128. self.assertTrue(n <= 30)
  129. for book in d["books"]:
  130. self.assert_book(book)
  131. class TestAppWithoutLogin(TestApp):
  132. def test_index(self):
  133. d = self.json("/api/index?random=7&recent=9")
  134. self.assertEqual(len(d["new_books"]), 9)
  135. self.assertEqual(len(d["random_books"]), 7)
  136. for book in d["new_books"] + d["random_books"]:
  137. self.assert_book_simple(book)
  138. book = d["new_books"][0]
  139. for author in book["authors"]:
  140. self.json("/api/author/" + Q(author))
  141. self.json("/api/author/" + Q(book["author"]))
  142. self.json("/api/publisher/" + Q(book["publisher"]))
  143. def test_search(self):
  144. d = self.json("/api/search")
  145. self.assertEqual(d["err"], "params.invalid")
  146. d = self.json("/api/search?hack")
  147. self.assertEqual(d["err"], "params.invalid")
  148. d = self.json("/api/search?name=NotFound")
  149. self.assertEqual(d["err"], "ok")
  150. self.assertEqual(d["total"], 0)
  151. self.assertEqual(len(d["books"]), 0)
  152. d = self.json("/api/search?name=A")
  153. self.assert_book_list(d, 6)
  154. def test_hot(self):
  155. d = self.json("/api/hot")
  156. self.assert_book_list(d, 0)
  157. def test_recent(self):
  158. d = self.json("/api/recent")
  159. self.assert_book_list(d, 10)
  160. def test_download(self):
  161. rsp = self.fetch("/api/book/1.epub", follow_redirects=False)
  162. self.assertEqual(rsp.code, 302)
  163. def test_push(self):
  164. d = self.json("/api/book/1/push", method="POST", body="mail_to=unittest@gmail.com")
  165. self.assertEqual(d["err"], "user.need_login")
  166. def test_user_info(self):
  167. d = self.json("/api/user/info")
  168. self.assertEqual(d["user"]["is_login"], False)
  169. self.assertEqual(d["user"]["is_admin"], False)
  170. self.assertEqual(d["sys"]["books"], 13)
  171. d = self.json("/api/user/info?detail=1")
  172. self.assertEqual(d["user"]["is_login"], False)
  173. self.assertEqual(d["sys"], {})
  174. def test_book(self):
  175. d = self.json("/api/book/1")
  176. self.assertEqual(d["err"], "ok")
  177. class TestMeta(TestApp):
  178. def assert_meta(self, meta, total):
  179. d = self.json("/api/%s" % meta)
  180. self.assertEqual(len(d["items"]), total)
  181. for m in d["items"]:
  182. url = "/api/%s/%s" % (meta, Q(m["name"]))
  183. a = self.json(url)
  184. self.gt(len(a["books"]), 1)
  185. b = self.json(url + "?size=3")
  186. c = self.json(url + "?start=3")
  187. d = self.json(url + "?sort=timestamp")
  188. self.assertEqual(a["total"], b["total"])
  189. self.assertEqual(a["total"], c["total"])
  190. self.assertEqual(a["total"], d["total"])
  191. def test_tag(self):
  192. self.assert_meta("tag", 72)
  193. def test_author(self):
  194. self.assert_meta("author", 15)
  195. def test_series(self):
  196. self.assert_meta("series", 7)
  197. def test_publisher(self):
  198. self.assert_meta("publisher", 10)
  199. def test_rating(self):
  200. self.assert_meta("rating", 3)
  201. class AutoResetPermission:
  202. def __init__(self, arg):
  203. if not arg:
  204. arg = models.Reader.id == 1
  205. self.arg = arg
  206. def __enter__(self):
  207. self.user = get_db().query(models.Reader).filter(self.arg).first()
  208. self.user.permission = ""
  209. return self.user
  210. def __exit__(self, type, value, trace):
  211. self.user.permission = ""
  212. def mock_permission(arg=None):
  213. return AutoResetPermission(arg)
  214. class TestWithUserLogin(TestApp):
  215. @classmethod
  216. def setUpClass(self):
  217. self.user = _mock_user.start()
  218. self.mail = _mock_mail.start()
  219. self.user.return_value = 1
  220. self.mail.return_value = True
  221. @classmethod
  222. def tearDownClass(self):
  223. _mock_user.stop()
  224. _mock_mail.stop()
  225. class TestUser(TestWithUserLogin):
  226. def test_userinfo(self):
  227. d = self.json("/api/user/info")
  228. self.assertEqual(d["err"], "ok")
  229. self.assertEqual(d["user"]["is_login"], True)
  230. d = self.json("/api/user/info?detail=1")
  231. self.assertEqual(d["err"], "ok")
  232. self.assertEqual(d["user"]["is_login"], True)
  233. self.assertEqual(d["sys"], {})
  234. self.assertTrue(len(d["user"]["extra"]["download_history"]) >= 1)
  235. def add_user(self):
  236. self.mail.reset_mock()
  237. body = "email=active@gmail.com&nickname=active&username=active&password=active66"
  238. d = self.json("/api/user/sign_up", method="POST", raise_error=True, body=body)
  239. self.assertTrue(d["err"] in ["ok", "params.username.exist"])
  240. def test_login(self):
  241. # rsp = self.fetch("/api/done", follow_redirects=False)
  242. # self.assertEqual(rsp.code, 302)
  243. self.add_user()
  244. user = get_db().query(models.Reader).filter(models.Reader.username == "active").first()
  245. user.permission = ""
  246. d = self.json("/api/user/sign_in", method="POST", body="username=active&password=active66")
  247. self.assertEqual(d["err"], "ok")
  248. user = get_db().query(models.Reader).filter(models.Reader.username == "active").first()
  249. user.set_permission("L")
  250. logging.debug("user[%s] id[%s] permission[%s]", user.username, user.id, user.permission)
  251. d = self.json("/api/user/sign_in", method="POST", body="username=active&password=active66")
  252. self.assertEqual(d["err"], "permission")
  253. user = get_db().query(models.Reader).filter(models.Reader.username == "active").first()
  254. user.set_permission("l")
  255. d = self.json("/api/user/sign_in", method="POST", body="username=active&password=active66")
  256. self.assertEqual(d["err"], "ok")
  257. class TestFile(TestWithUserLogin):
  258. def test_get_cover(self):
  259. rsp = self.fetch("/get/cover/1.jpg", follow_redirects=False)
  260. self.assertEqual(rsp.code, 200)
  261. def test_get_thumb(self):
  262. rsp = self.fetch("/get/thumb_80x80/1.jpg", follow_redirects=False)
  263. self.assertEqual(rsp.code, 200)
  264. def test_get_opf(self):
  265. rsp = self.fetch("/get/opf/1", follow_redirects=False)
  266. self.assertEqual(rsp.code, 200)
  267. class TestBook(TestWithUserLogin):
  268. def test_nav(self):
  269. rsp = self.fetch("/api/book/nav")
  270. self.assertEqual(rsp.code, 200)
  271. def test_download(self):
  272. rsp = self.fetch("/api/book/1.epub")
  273. self.assertEqual(rsp.code, 200)
  274. rsp = self.fetch("/api/book/1.pdf")
  275. self.assertEqual(rsp.code, 404)
  276. def test_download_permission(self):
  277. with mock_permission() as user:
  278. user.set_permission("S") # forbid
  279. rsp = self.fetch("/api/book/1.epub")
  280. self.assertEqual(rsp.code, 403)
  281. user.set_permission("s") # allow
  282. rsp = self.fetch("/api/book/1.epub")
  283. self.assertEqual(rsp.code, 200)
  284. def mock_convert(self):
  285. class MockConvertPath:
  286. def __init__(self, path=None):
  287. if not path:
  288. path = testdir + "/cases/old.epub"
  289. self.mock1 = mock.patch.object(webserver.handlers.book.BookPush, "get_path_of_fmt", return_value=path)
  290. self.mock2 = mock.patch("webserver.handlers.book.do_ebook_convert", return_value=True)
  291. def __enter__(self):
  292. self.mock1.start()
  293. return self.mock2.start()
  294. def __exit__(self, type, value, trace):
  295. self.mock1.stop()
  296. self.mock2.stop()
  297. return MockConvertPath()
  298. def test_push(self):
  299. with self.mock_convert() as m:
  300. d = self.json("/api/book/1/push", method="POST", body="mail_to=unittest@gmail.com")
  301. self.assertEqual(d["err"], "ok")
  302. self.assertTrue(m.call_count + self.mail.call_count <= 2)
  303. def test_push_permission(self):
  304. with self.mock_convert():
  305. with mock_permission() as user:
  306. # forbid
  307. user.set_permission("P")
  308. d = self.json("/api/book/1/push", method="POST", body="mail_to=unittest@gmail.com")
  309. self.assertEqual(d["err"], "permission")
  310. with self.mock_convert() as m:
  311. with mock_permission() as user:
  312. # allow
  313. user.set_permission("p")
  314. d = self.json("/api/book/1/push", method="POST", body="mail_to=unittest@gmail.com")
  315. self.assertEqual(d["err"], "ok")
  316. self.assertTrue(m.call_count + self.mail.call_count <= 2)
  317. def test_delete(self):
  318. global _app
  319. with mock.patch.object(_app.settings["legacy"], "delete_book", return_value="Yo") as m:
  320. # because 'delete' trigger a refresh
  321. with mock_permission() as user:
  322. # default
  323. d = self.json("/api/book/1/delete", method="POST", body="")
  324. self.assertEqual(d["err"], "ok")
  325. self.assertEqual(m.call_count, 1)
  326. with mock_permission() as user:
  327. user.set_permission("D") # forbid
  328. d = self.json("/api/book/1/delete", method="POST", body="")
  329. self.assertEqual(d["err"], "permission")
  330. self.assertEqual(m.call_count, 1)
  331. with mock_permission() as user:
  332. user.set_permission("d") # allow
  333. d = self.json("/api/book/1/delete", method="POST", body="")
  334. self.assertEqual(d["err"], "ok")
  335. self.assertEqual(m.call_count, 2)
  336. def test_read(self):
  337. with mock.patch.object(webserver.handlers.book.BookRead, "extract_book", return_value="Yo"):
  338. for bid in BIDS:
  339. rsp = self.fetch("/read/%s" % bid, follow_redirects=False)
  340. self.assertEqual(rsp.code, 302 if bid == BID_PDF else 200)
  341. def test_edit(self):
  342. body = {
  343. "id": 5,
  344. "title": "老人与海",
  345. "rating": 8,
  346. "count_visit": 4,
  347. "count_download": 1,
  348. "timestamp": "2022-01-20",
  349. "pubdate": "1999-10-01",
  350. "collector": "admin",
  351. "authors": ["海明威"],
  352. "author": "海明威",
  353. "tags": ["海明威", "老人与海", "外国文学", "经典", "励志", "小说", "名著", "美国"],
  354. "author_sort": "海明威",
  355. "publisher": "上海译文出版社",
  356. "comments": "本书讲述了一个渔夫的故事。古巴老渔夫圣地亚哥在连续八十四天没捕到鱼的情况下。",
  357. "series": "x",
  358. "language": None,
  359. "isbn": "9787532723447",
  360. "is_public": True,
  361. "is_owner": True,
  362. }
  363. with mock.patch.object(_app.settings["legacy"], "set_metadata", return_value="Yo"):
  364. r = self.json("/api/book/1/edit", method="POST", raise_error=True, body=json.dumps(body))
  365. self.assertEqual(r["err"], "ok")
  366. class TestReferDouban(TestWithUserLogin):
  367. def setUp(self):
  368. self.douban_url = "http://10.0.0.15:7001"
  369. try:
  370. requests.get(self.douban_url, timeout=2)
  371. except:
  372. self.skipTest("without douban plugin, skip refer test")
  373. super().setUp()
  374. def tttest_refer(self):
  375. # with mock.patch("plugins.meta.baike.BaiduBaikeApi.get_book", return_value=self.fake_baidu) as m:
  376. main.CONF["douban_baseurl"] = self.douban_url
  377. d = self.json("/api/book/1/refer")
  378. self.assertEqual(d["err"], "ok")
  379. global _app
  380. with mock.patch.object(_app.settings["legacy"], "set_metadata", return_value="Yo"):
  381. for book in d["books"]:
  382. body = "provider_key=%(provider_key)s&provider_value=%(provider_value)s" % book
  383. r = self.json("/api/book/1/refer", method="POST", raise_error=True, body=body)
  384. self.assertEqual(r["err"], "ok")
  385. class TestRefer(TestWithUserLogin):
  386. @mock.patch("webserver.plugins.meta.douban.DoubanBookApi.search_books")
  387. @mock.patch("webserver.plugins.meta.douban.DoubanBookApi.get_book_by_isbn")
  388. @mock.patch("webserver.plugins.meta.douban.DoubanBookApi.get_book_by_id")
  389. @mock.patch("webserver.plugins.meta.douban.DoubanBookApi.get_cover")
  390. @mock.patch("webserver.plugins.meta.baike.BaiduBaikeApi._baike")
  391. @mock.patch("webserver.plugins.meta.baike.BaiduBaikeApi.get_cover")
  392. def test_refer(self, m6, m5, m4, m3, m2, m1):
  393. from tests.test_douban import DOUBAN_BOOK, DOUBAN_SEARCH
  394. from tests.test_baike import BAIKE_PAGE
  395. m1.return_value = DOUBAN_SEARCH['books']
  396. m2.return_value = dict(DOUBAN_BOOK)
  397. m3.return_value = dict(DOUBAN_BOOK)
  398. m4.return_value = ("jpg", b"image-body")
  399. m5.return_value = BAIKE_PAGE
  400. m6.return_value = ("jpg", b"image-body")
  401. # with mock.patch("plugins.meta.baike.BaiduBaikeApi.get_book", return_value=self.fake_baidu) as m:
  402. # main.CONF["douban_baseurl"] = self.douban_url
  403. d = self.json("/api/book/1/refer")
  404. self.assertEqual(d["err"], "ok")
  405. global _app
  406. with mock.patch.object(_app.settings["legacy"], "set_metadata", return_value="Yo"):
  407. for book in d["books"]:
  408. body = "provider_key=%(provider_key)s&provider_value=%(provider_value)s" % book
  409. r = self.json("/api/book/1/refer", method="POST", raise_error=True, body=body)
  410. self.assertEqual(r["err"], "ok")
  411. class TestUserSignUp(TestWithUserLogin):
  412. @classmethod
  413. def setUpClass(self):
  414. self.user = _mock_user.start()
  415. self.user.return_value = 1
  416. self.mail = _mock_mail.start()
  417. self.mail.return_value = True
  418. @classmethod
  419. def tearDownClass(self):
  420. self.delete_user()
  421. _mock_mail.stop()
  422. @classmethod
  423. def get_user(self):
  424. return get_db().query(models.Reader).filter(models.Reader.username == "unittest")
  425. @classmethod
  426. def delete_user(self):
  427. self.get_user().delete()
  428. get_db().commit()
  429. def test_signup(self):
  430. self.delete_user()
  431. self.mail.reset_mock()
  432. d = self.json("/api/user/sign_up", method="POST", raise_error=True, body="")
  433. self.assertEqual(d["err"], "params.invalid")
  434. body = "email=unittest@gmail.com&nickname=unittest&username=unittest&password=unittest"
  435. d = self.json("/api/user/sign_up", method="POST", raise_error=True, body=body)
  436. self.assertEqual(d["err"], "ok")
  437. self.assertEqual(self.mail.call_count, 1)
  438. user = self.get_user().first()
  439. self.assertEqual(user.name, "unittest")
  440. self.assertEqual(user.email, "unittest@gmail.com")
  441. self.assertEqual(user.active, False)
  442. code = "123456"
  443. rsp = self.fetch("/api/active/%s/%s" % ("unittest", code))
  444. self.assertEqual(rsp.code, 403)
  445. code = user.get_active_code()
  446. rsp = self.fetch("/api/active/%s/%s" % ("unittest", code), follow_redirects=False)
  447. self.assertEqual(rsp.code, 302)
  448. user = self.get_user().first()
  449. self.assertEqual(user.active, True)
  450. self.user.return_value = user.id
  451. d = self.json("/api/user/active/send")
  452. self.assertEqual(d["err"], "ok")
  453. self.assertEqual(self.mail.call_count, 2)
  454. # build fake auth header unittest:unittest
  455. f = FakeHandler()
  456. f.request.headers["Authorization"] = "xxxxx"
  457. self.assertEqual(False, BaseHandler.process_auth_header(f))
  458. f.request.headers["Authorization"] = self.auth("username:password")
  459. self.assertEqual(False, BaseHandler.process_auth_header(f))
  460. f.request.headers["Authorization"] = self.auth("unittest:password")
  461. self.assertEqual(False, BaseHandler.process_auth_header(f))
  462. ts = int(time.time())
  463. f.request.headers["Authorization"] = self.auth("unittest:unittest")
  464. self.assertEqual(True, BaseHandler.process_auth_header(f))
  465. self.assertTrue(int(f.cookie["invited"]) >= ts)
  466. self.assertTrue(int(f.cookie["lt"]) >= ts)
  467. self.assertTrue(int(f.cookie["lt"]) >= ts)
  468. self.delete_user()
  469. def auth(self, s):
  470. return "Basic " + base64.encodebytes(s.encode("ascii")).decode("ascii")
  471. class TestWithAdminUser(TestApp):
  472. @classmethod
  473. def setUpClass(self):
  474. self.user = _mock_user.start()
  475. self.user.return_value = 1
  476. @classmethod
  477. def tearDownClass(self):
  478. _mock_user.stop()
  479. class TestAdmin(TestWithAdminUser):
  480. def test_admin_users(self):
  481. d = self.json("/api/admin/users")
  482. self.assertEqual(d["err"], "ok")
  483. self.assertEqual(d["users"]["total"], 31)
  484. def test_admin_settings(self):
  485. d = self.json("/api/admin/settings")
  486. self.assertEqual(d["err"], "ok")
  487. self.assertTrue(len(d["settings"]) > 10)
  488. with mock.patch.object(loader.SettingsLoader, "set_store_path", return_value="/tmp/"):
  489. req = {"site_title": "abc", "not_work": "en"}
  490. d = self.json("/api/admin/settings", method="POST", body=json.dumps(req))
  491. self.assertEqual(d["err"], "ok")
  492. self.assertEqual(d["rsp"]["site_title"], "abc")
  493. self.assertTrue("not_work" not in d["rsp"])
  494. class TestOpds(TestWithUserLogin):
  495. def parse_xml(self, text):
  496. from xml.parsers.expat import ParserCreate
  497. ParserCreate()
  498. def test_opds(self):
  499. rsp = self.fetch("/opds/")
  500. self.assertEqual(rsp.code, 200)
  501. self.parse_xml(rsp.body)
  502. def test_opds_nav(self):
  503. rsp = self.fetch("/opds/nav/4e617574686f7273?offset=1")
  504. self.assertEqual(rsp.code, 200)
  505. self.parse_xml(rsp.body)
  506. def test_opds_nav2(self):
  507. main.CONF["opds_max_ungrouped_items"] = 2
  508. navs = [
  509. b'Nauthors',
  510. b'Nlanguages',
  511. b'Npublisher',
  512. b'Nrating',
  513. b'Nseries',
  514. b'Ntags',
  515. b'Onewest',
  516. b'Otitle',
  517. ]
  518. groups = [
  519. 2,
  520. main.CONF["opds_max_ungrouped_items"],
  521. ]
  522. for nav in navs:
  523. url = "/opds/nav/%s" % nav.hex()
  524. for g in groups:
  525. main.CONF["opds_max_ungrouped_items"] = g
  526. rsp = self.fetch(url)
  527. self.assertEqual(rsp.code, 200)
  528. self.parse_xml(rsp.body)
  529. def test_opds_category(self):
  530. a = b'tags'.hex()
  531. b = b'I71:tags'.hex()
  532. rsp = self.fetch("/opds/category/%s/%s" % (a,b))
  533. self.assertEqual(rsp.code, 200)
  534. self.parse_xml(rsp.body)
  535. @unittest.skip("category里的search功能暂时搞不懂,以后考虑删掉")
  536. def test_opds_category_search(self):
  537. b = ("I"+urllib.parse.quote("韩寒")+":authors").encode("UTF-8").hex()
  538. a = b"search".hex()
  539. b = b'I5:authors'.hex()
  540. b = "I文学:tags".encode("UTF-8").hex()
  541. rsp = self.fetch("/opds/category/%s/%s" % (a, b))
  542. self.assertEqual(rsp.code, 200, rsp.body)
  543. self.parse_xml(rsp.body)
  544. def test_opds_category_group(self):
  545. a = b'tags'.hex()
  546. b = b'C'.hex()
  547. rsp = self.fetch("/opds/categorygroup/%s/%s" % (a,b))
  548. self.assertEqual(rsp.code, 200)
  549. self.parse_xml(rsp.body)
  550. def test_opds_search(self):
  551. rsp = self.fetch("/opds/search/%s" % urllib.parse.quote("韩寒"))
  552. self.assertEqual(rsp.code, 200)
  553. self.parse_xml(rsp.body)
  554. def test_opds_search_not_found(self):
  555. rsp = self.fetch("/opds/search/%s" % urllib.parse.quote("豪士"))
  556. self.assertEqual(rsp.code, 404)
  557. def test_opds_without_login(self):
  558. main.CONF["INVITE_MODE"] = True
  559. rsp = self.fetch("/opds/nav/%s" % b'Otitle'.hex())
  560. self.assertEqual(rsp.code, 401)
  561. main.CONF["INVITE_MODE"] = False
  562. class TestConvert(TestApp):
  563. def test_convert(self):
  564. fin = testdir + "/cases/old.epub"
  565. fout = "/tmp/output.mobi"
  566. flog = "/tmp/output.log"
  567. ok = webserver.handlers.book.do_ebook_convert(fin, fout, flog)
  568. self.assertEqual(ok, True)
  569. class TestJsonResponse(TestApp):
  570. def raise_(self, err):
  571. raise err
  572. def assertHeaders(self, headers):
  573. self.assertEqual(
  574. headers,
  575. {
  576. "Access-Control-Allow-Origin": "*",
  577. "Access-Control-Allow-Credentials": "true",
  578. "Cache-Control": "max-age=0",
  579. },
  580. )
  581. def test_err(self):
  582. f = FakeHandler()
  583. with mock.patch("traceback.format_exc", return_value=""):
  584. webserver.handlers.base.js(lambda x: self.raise_(RuntimeError()))(f)
  585. self.assertTrue(isinstance(f.rsp["msg"], str))
  586. self.assertEqual(f.rsp["err"], "exception")
  587. self.assertHeaders(f.rsp_headers)
  588. def test_finish(self):
  589. f = FakeHandler()
  590. with mock.patch("traceback.format_exc", return_value=""):
  591. webserver.handlers.base.js(lambda x: self.raise_(web.Finish()))(f)
  592. self.assertEqual(f.rsp, "")
  593. self.assertHeaders(f.rsp_headers)
  594. class TestInviteMode(TestApp):
  595. def setUp(self):
  596. main.CONF["INVITE_MODE"] = True
  597. TestApp.setUp(self)
  598. def tearDown(self):
  599. main.CONF["INVITE_MODE"] = False
  600. TestApp.tearDown(self)
  601. def test_index(self):
  602. d = self.json("/api/index")
  603. self.assertEqual(d["err"], "not_invited")
  604. d = self.json("/api/book/1")
  605. self.assertEqual(d["err"], "not_invited")
  606. r = self.fetch("/api/book/1.epub")
  607. self.assertEqual(r.code, 401)
  608. def test_opds_index(self):
  609. r = self.fetch("/opds/")
  610. self.assertEqual(r.code, 401)
  611. r = self.fetch("/opds/nav/4e617574686f7273?offset=1")
  612. self.assertEqual(r.code, 401)
  613. def setUpModule():
  614. os.environ["ASYNC_TEST_TIMEOUT"] = "60"
  615. logging.basicConfig(
  616. level=logging.DEBUG,
  617. datefmt="%Y-%m-%d %H:%M:%S",
  618. format="%(asctime)s %(levelname)7s %(pathname)s:%(lineno)d %(message)s",
  619. )
  620. setup_server()
  621. setup_mock_user()
  622. setup_mock_sendmail()
  623. if __name__ == "__main__":
  624. unittest.main()