This repository has been archived by the owner on Sep 22, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
__main__.py
executable file
·182 lines (142 loc) · 5.95 KB
/
__main__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python3
"""Run script.
"""
import logging
import os
import re
import shlex
import subprocess
import sys
import typing
import zlib
from threading import Condition, Thread
import aiohttp
import requests
from clinner.command import Type as CommandType
from clinner.command import command
from clinner.inputs import bool_input
from clinner.run.main import Main
from tqdm import tqdm
logger = logging.getLogger("cli")
BASE_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "data")
CONSENSUS_DB_URL = "https://consensus.siahub.info/consensus.db.gz"
REGEX_LOADING = re.compile(r"Finished loading.*")
class ConsensusDB:
CHUNK_SIZE = 64 * (2 ** 10)
def __init__(self, base_path: str, url: str):
self.url = url
dir_path = os.path.join(base_path, "consensus")
os.makedirs(dir_path, exist_ok=True)
self.file_path = os.path.join(dir_path, "consensus.db")
async def __aenter__(self):
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None))
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._session.close()
del self._session
@property
def exists(self):
return os.path.exists(self.file_path)
@property
def session(self):
if not hasattr(self, "_session"):
raise AttributeError("Must be executed under context")
return self._session
async def _download_size(self) -> int:
async with self.session.head(self.url) as response:
return int(response.headers.get("Content-Length", "0"))
async def _download(self) -> typing.AsyncGenerator[typing.Tuple[bytes, int], None]:
decompress = zlib.decompressobj(16 + zlib.MAX_WBITS)
async with self.session.get(self.url) as response:
while True:
chunk = await response.content.read(self.CHUNK_SIZE)
if not chunk:
break
yield (decompress.decompress(chunk), len(chunk))
yield (decompress.flush(), 0)
async def bootstrap(self):
try:
with open(self.file_path, "wb") as output, tqdm(unit="B", unit_scale=True, unit_divisor=2 ** 10) as p:
total = await self._download_size()
p.total = total
percents = [(i, total / 100 * i) for i in range(10, 100, 10)]
notify_percent, notify_value = percents.pop(0)
p.write(f"Starting download consensus database ({total} bytes)")
async for (chunk, size) in self._download():
output.write(chunk)
p.update(size)
if notify_value and p.n >= notify_value:
p.write(f"Downloaded {notify_percent}% ({p.n} bytes)")
try:
notify_percent, notify_value = percents.pop(0)
except IndexError:
notify_percent, notify_value = None, None
p.write("Download finished")
except zlib.error as e:
logger.error(f"Failed to download the bootstrap database. %s", str(e))
def unlock(condition: Condition):
with condition:
condition.wait()
try:
with requests.session() as session:
session.post("http://localhost:8000/wallet/unlock", json={"primaryseed": os.environ["UNLOCK_WALLET"]})
logger.info("Unlock wallet")
except KeyError:
logger.error("Cannot unlock wallet. Primary seed must be specified under UNLOCK_WALLET environment variable")
def redirect_output(process, condition: Condition):
loading = True
try:
for message in process.stdout:
print(message, end="")
if loading and REGEX_LOADING.match(message):
loading = False
with condition:
condition.notify_all()
except ValueError:
pass
@command(
command_type=CommandType.PYTHON,
args=(
(("--bootstrap",), {"help": "Force bootstrap consensus database", "action": "store_true"}),
(("--no-bootstrap",), {"help": "Do not bootstrap consensus database", "action": "store_true"}),
(("--unlock",), {"help": "Unlock wallet", "action": "store_true"}),
),
parser_opts={"help": "Start Sia daemon. Bootstrap the consensus database if it is not found."},
)
async def start(*args, **kwargs):
os.chdir(BASE_PATH)
consensus = ConsensusDB(base_path=BASE_PATH, url=CONSENSUS_DB_URL)
if (
not consensus.exists
and not kwargs["no_bootstrap"]
and (kwargs["bootstrap"] or bool_input("Do you want to bootstrap consensus database?"))
):
logger.info("Bootstrapping consensus database")
async with consensus:
await consensus.bootstrap()
else:
logger.info("Skip bootstrap consensus database")
condition = Condition()
if kwargs["unlock"]:
unlock_thread = Thread(name="unlock", target=unlock, args=(condition,))
unlock_thread.start()
socat_cmd = shlex.split("socat tcp-listen:8000,reuseaddr,fork tcp:localhost:9980")
siad_cmd = shlex.split(f"siad --sia-directory {os.getcwd()}") + list(args)
try:
with subprocess.Popen(socat_cmd) as socat_process:
with subprocess.Popen(
siad_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True
) as siad_process:
redirect_output_thread = Thread(
name="redirect_output", target=redirect_output, args=(siad_process, condition)
)
redirect_output_thread.start()
siad_process.wait()
socat_process.terminate()
except KeyboardInterrupt:
pass
finally:
logger.info("Siad command finished with code %d", siad_process.returncode)
logger.info("Socat command finished with code %d", socat_process.returncode)
if __name__ == "__main__":
sys.exit(Main().run())