conftest.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import logging
  2. import os
  3. import shlex
  4. import subprocess
  5. import time
  6. import pytest
  7. import pandas._testing as tm
  8. from pandas.io.parsers import read_csv
  9. @pytest.fixture
  10. def tips_file(datapath):
  11. """Path to the tips dataset"""
  12. return datapath("io", "data", "csv", "tips.csv")
  13. @pytest.fixture
  14. def jsonl_file(datapath):
  15. """Path to a JSONL dataset"""
  16. return datapath("io", "parser", "data", "items.jsonl")
  17. @pytest.fixture
  18. def salaries_table(datapath):
  19. """DataFrame with the salaries dataset"""
  20. return read_csv(datapath("io", "parser", "data", "salaries.csv"), sep="\t")
  21. @pytest.fixture
  22. def feather_file(datapath):
  23. return datapath("io", "data", "feather", "feather-0_3_1.feather")
  24. @pytest.fixture
  25. def s3so(worker_id):
  26. worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
  27. return {"client_kwargs": {"endpoint_url": f"http://127.0.0.1:555{worker_id}/"}}
  28. @pytest.fixture(scope="session")
  29. def s3_base(worker_id):
  30. """
  31. Fixture for mocking S3 interaction.
  32. Sets up moto server in separate process
  33. """
  34. pytest.importorskip("s3fs")
  35. pytest.importorskip("boto3")
  36. requests = pytest.importorskip("requests")
  37. logging.getLogger("requests").disabled = True
  38. with tm.ensure_safe_environment_variables():
  39. # temporary workaround as moto fails for botocore >= 1.11 otherwise,
  40. # see https://github.com/spulec/moto/issues/1924 & 1952
  41. os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key")
  42. os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret")
  43. pytest.importorskip("moto", minversion="1.3.14")
  44. pytest.importorskip("flask") # server mode needs flask too
  45. # Launching moto in server mode, i.e., as a separate process
  46. # with an S3 endpoint on localhost
  47. worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
  48. endpoint_port = f"555{worker_id}"
  49. endpoint_uri = f"http://127.0.0.1:{endpoint_port}/"
  50. # pipe to null to avoid logging in terminal
  51. proc = subprocess.Popen(
  52. shlex.split(f"moto_server s3 -p {endpoint_port}"),
  53. stdout=subprocess.DEVNULL,
  54. stderr=subprocess.DEVNULL,
  55. )
  56. timeout = 5
  57. while timeout > 0:
  58. try:
  59. # OK to go once server is accepting connections
  60. r = requests.get(endpoint_uri)
  61. if r.ok:
  62. break
  63. except Exception:
  64. pass
  65. timeout -= 0.1
  66. time.sleep(0.1)
  67. yield endpoint_uri
  68. proc.terminate()
  69. proc.wait()
  70. @pytest.fixture()
  71. def s3_resource(s3_base, tips_file, jsonl_file, feather_file):
  72. """
  73. Sets up S3 bucket with contents
  74. The primary bucket name is "pandas-test". The following datasets
  75. are loaded.
  76. - tips.csv
  77. - tips.csv.gz
  78. - tips.csv.bz2
  79. - items.jsonl
  80. A private bucket "cant_get_it" is also created. The boto3 s3 resource
  81. is yielded by the fixture.
  82. """
  83. import boto3
  84. import s3fs
  85. test_s3_files = [
  86. ("tips#1.csv", tips_file),
  87. ("tips.csv", tips_file),
  88. ("tips.csv.gz", tips_file + ".gz"),
  89. ("tips.csv.bz2", tips_file + ".bz2"),
  90. ("items.jsonl", jsonl_file),
  91. ("simple_dataset.feather", feather_file),
  92. ]
  93. def add_tips_files(bucket_name):
  94. for s3_key, file_name in test_s3_files:
  95. with open(file_name, "rb") as f:
  96. cli.put_object(Bucket=bucket_name, Key=s3_key, Body=f)
  97. bucket = "pandas-test"
  98. conn = boto3.resource("s3", endpoint_url=s3_base)
  99. cli = boto3.client("s3", endpoint_url=s3_base)
  100. try:
  101. cli.create_bucket(Bucket=bucket)
  102. except: # noqa
  103. # OK is bucket already exists
  104. pass
  105. try:
  106. cli.create_bucket(Bucket="cant_get_it", ACL="private")
  107. except: # noqa
  108. # OK is bucket already exists
  109. pass
  110. timeout = 2
  111. while not cli.list_buckets()["Buckets"] and timeout > 0:
  112. time.sleep(0.1)
  113. timeout -= 0.1
  114. add_tips_files(bucket)
  115. add_tips_files("cant_get_it")
  116. s3fs.S3FileSystem.clear_instance_cache()
  117. yield conn
  118. s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": s3_base})
  119. try:
  120. s3.rm(bucket, recursive=True)
  121. except: # noqa
  122. pass
  123. try:
  124. s3.rm("cant_get_it", recursive=True)
  125. except: # noqa
  126. pass
  127. timeout = 2
  128. while cli.list_buckets()["Buckets"] and timeout > 0:
  129. time.sleep(0.1)
  130. timeout -= 0.1