Commit f717d268 authored by André Melo's avatar André Melo
Browse files

Remove legacy code, prettify code with black

parent c1aeeeb8
Pipeline #31625 passed with stage
in 9 minutes and 35 seconds
......@@ -52,20 +52,20 @@ from scipy.spatial import cKDTree
# + Collapsed="false"
def preprint_ids(only_fresh=True):
feed = feedparser.parse('https://export.arxiv.org/rss/cond-mat')
feed = feedparser.parse("https://export.arxiv.org/rss/cond-mat")
if feed['bozo']: # Fetch failed
if feed["bozo"]: # Fetch failed
return []
published = datetime.fromtimestamp(mktime(feed.updated_parsed))
if only_fresh and datetime.now() - published > timedelta(days=1):
return []
preprints = feed['entries']
preprints = feed["entries"]
return [
preprint['id'].split('abs/')[-1]
preprint["id"].split("abs/")[-1]
for preprint in preprints
if 'UPDATED' not in preprint['title']
if "UPDATED" not in preprint["title"]
# TODO: We could check if the new preprint version is improved
]
......@@ -74,14 +74,17 @@ def preprint_ids(only_fresh=True):
# ## Download PDFs
# + Collapsed="false"
ALLOWED_EXTS = ['png', 'jpg', 'jpeg', 'tiff']
IGNORE_EXTS = ['tex', 'bst', 'cls', 'bbl', 'dat', 'rtx', 'aux', 'bib', 'sty']
ALLOWED_EXTS = ["png", "jpg", "jpeg", "tiff"]
IGNORE_EXTS = ["tex", "bst", "cls", "bbl", "dat", "rtx", "aux", "bib", "sty"]
def pdf_url(paper_id):
return f'https://www.arxiv.org/pdf/{paper_id}'
return f"https://www.arxiv.org/pdf/{paper_id}"
def preprint_url(paper_id):
return f'https://www.arxiv.org/e-print/{paper_id}'
return f"https://www.arxiv.org/e-print/{paper_id}"
def response(paper_id):
"""Request response objects corresponding to each preprint of the day"""
......@@ -91,22 +94,24 @@ def response(paper_id):
if r.status_code == 200:
return r
elif r.status_code == 403:
raise RuntimeError('Rate limit exceeded')
raise RuntimeError("Rate limit exceeded")
else:
raise RuntimeError(f'Unknown error, f{r.status_code}')
raise RuntimeError(f"Unknown error, f{r.status_code}")
def convert_to_rgb(img):
if img.mode in ['L', 'CMYK', 'HSV', 'RGB']:
return img.convert('RGB')
elif img.mode == 'RGBA':
if img.mode in ["L", "CMYK", "HSV", "RGB"]:
return img.convert("RGB")
elif img.mode == "RGBA":
# Create a white background, and paste the RGBA image
# on top of it with the alpha channel as the mask
background = PIL.Image.new('RGB', img.size, (255, 255, 255))
background = PIL.Image.new("RGB", img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1])
return background
else:
logging.warning(f'Unknown image mode {img.mode}')
return img.convert('RGB')
logging.warning(f"Unknown image mode {img.mode}")
return img.convert("RGB")
def paper_images(paths):
for file in paths:
......@@ -123,21 +128,22 @@ def paper_images(paths):
continue
yield image
def extract_images_pdf(pdf):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
pdf_file = root / 'preprint.pdf'
pdf_file = root / "preprint.pdf"
pdf_file.write_bytes(pdf)
subprocess.call(
f'pdfimages -all {pdf_file} {root}/image',
f"pdfimages -all {pdf_file} {root}/image",
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
paths = root.glob('image*')
paths = root.glob("image*")
yield from paper_images(paths)
......@@ -153,31 +159,33 @@ def extract_images_tar(tar):
if suffix in ALLOWED_EXTS:
with PIL.Image.open(tar.extractfile(member)) as image:
yield convert_to_rgb(image)
elif suffix == 'pdf':
image = pdf2image.convert_from_bytes(tar.extractfile(member).read(),
fmt='jpg')[0]
elif suffix == "pdf":
image = pdf2image.convert_from_bytes(
tar.extractfile(member).read(), fmt="jpg"
)[0]
yield convert_to_rgb(image)
elif suffix in IGNORE_EXTS:
pass
else:
logging.warning(f'Unknown extension {suffix}')
logging.warning(f"Unknown extension {suffix}")
def extract_images(response):
content_type = response.headers['Content-Type']
if content_type == 'application/pdf':
content_type = response.headers["Content-Type"]
if content_type == "application/pdf":
yield from extract_images_pdf(response.content)
elif content_type == 'application/x-eprint-tar':
elif content_type == "application/x-eprint-tar":
with tarfile.open(fileobj=BytesIO(response.content)) as tar:
yield from extract_images_tar(tar)
elif content_type != 'application/x-eprint':
logging.warning(f'Unknown content type {content_type}')
elif content_type != "application/x-eprint":
logging.warning(f"Unknown content type {content_type}")
# + [markdown] Collapsed="false"
# # Colormap detection
# + Collapsed="false"
jet_fn = cm.get_cmap('jet')
jet_fn = cm.get_cmap("jet")
jet_pts = [jet_fn(x) for x in np.linspace(0, 1, 400)]
jet_pts = np.array(jet_pts)[:, :3]
JET_TREE = cKDTree(jet_pts)
......@@ -185,7 +193,7 @@ JET_TREE = cKDTree(jet_pts)
def points_near_jet(img, r=0.2, plot=False):
# Represent image as float so we can perform division
img = np.array(img, dtype='float').reshape(-1, 3)
img = np.array(img, dtype="float").reshape(-1, 3)
img /= 255 # Map colors into a unit cube.
img_tree = cKDTree(img, balanced_tree=False, compact_nodes=False)
pts = [res for neighs in JET_TREE.query_ball_tree(img_tree, r) for res in neighs]
......@@ -193,10 +201,11 @@ def points_near_jet(img, r=0.2, plot=False):
if plot:
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.plot(*jet_pts.T, lw=10, label='Jet colormap')
ax.scatter(*img.T, s=5, c='C1', label='RGB points of figure')
ax = fig.add_subplot(111, projection="3d")
ax.plot(*jet_pts.T, lw=10, label="Jet colormap")
ax.scatter(*img.T, s=5, c="C1", label="RGB points of figure")
plt.legend()
plt.show()
......@@ -213,14 +222,14 @@ def label_image(pts, h, w):
def contour_mask(h, w, cnt):
empty_img = np.zeros((h, w, 3))
mask = cv2.drawContours(empty_img, [cnt], 0, (0,1,0), -1)
mask = cv2.drawContours(empty_img, [cnt], 0, (0, 1, 0), -1)
mask = np.nonzero(mask)
return mask[:2]
def bounding_rect_slice(cnt):
x, y, w, h = cv.boundingRect(cnt)
return slice(y, y+h), slice(x, x+w)
return slice(y, y + h), slice(x, x + w)
def contains_jet(pts, r_max):
......@@ -238,8 +247,8 @@ todays_ids = preprint_ids()
if not todays_ids:
sys.exit()
today = date.today().isoformat().replace('-', '')
today = date.today().isoformat().replace("-", "")
# + Collapsed="false"
jet_papers = []
......@@ -252,9 +261,7 @@ for paper_id in tqdm.tqdm(todays_ids):
candidates = points_near_jet(resized_img, 0.2)
labeled_image = label_image(candidates, 200, 200)
contours, hierarchy = cv2.findContours(
labeled_image,
cv2.RETR_TREE,
cv2.CHAIN_APPROX_SIMPLE
labeled_image, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE
)
contours = [cnt for cnt in contours if cv2.contourArea(cnt) > 100]
masks = [contour_mask(200, 200, cnt) for cnt in contours]
......@@ -271,25 +278,14 @@ for paper_id in tqdm.tqdm(todays_ids):
# + [markdown] Collapsed="false"
# # Tweet stuff
# + Collapsed="false"
# from getpass import getpass
# os.environ['API_KEY'] = getpass()
# os.environ['API_SECRET_KEY'] = getpass()
# os.environ['ACCESS_TOKEN'] = getpass()
# os.environ['ACCESS_TOKEN_SECRET'] = getpass()
# + Collapsed="false"
# Authenticate to Twitter
auth = tweepy.OAuthHandler(os.environ['API_KEY'], os.environ['API_SECRET_KEY'])
auth.set_access_token(os.environ['ACCESS_TOKEN'], os.environ['ACCESS_TOKEN_SECRET'])
auth = tweepy.OAuthHandler(os.environ["API_KEY"], os.environ["API_SECRET_KEY"])
auth.set_access_token(os.environ["ACCESS_TOKEN"], os.environ["ACCESS_TOKEN_SECRET"])
# Create API object
api = tweepy.API(auth)
# + Collapsed="false"
# # Create a tweet
# api.update_status("🚓 INITATING arxiv/cond-mat PATROL SEQUENCE 🚓")
# + Collapsed="false"
MAX_TWEET_LENGTH = 280
URL_LENGTH = 23
......@@ -299,68 +295,65 @@ URL_LENGTH = 23
evaluations = {
0: [
'NO JET USAGE DETECTED☮',
'MUST STAY VIGILANT',
'THIS MARKS THE BEGINNING OF A NEW ERA OF PEACE AND PROSPERITY',
],
1: [
'VIOLATION REPORTED',
'ISSUING A WARNING NOTICE',
],
2: [
'UPDATING PLOT CRIMINAL RECORDS',
'DISPATCHING CAM02-UCS UNITS',
"NO JET USAGE DETECTED☮",
"MUST STAY VIGILANT",
"THIS MARKS THE BEGINNING OF A NEW ERA OF PEACE AND PROSPERITY",
],
1: ["VIOLATION REPORTED", "ISSUING A WARNING NOTICE",],
2: ["UPDATING PLOT CRIMINAL RECORDS", "DISPATCHING CAM02-UCS UNITS",],
3: [
'PATROL UNITS SUSTAINED NO PERMANENT DAMAGE',
'AREA QUARANTINED',
'VISUAL CONTACT ESTABLISHED',
"PATROL UNITS SUSTAINED NO PERMANENT DAMAGE",
"AREA QUARANTINED",
"VISUAL CONTACT ESTABLISHED",
],
4: [
'SUSPECTED COORDINATED JET ATTACK',
'WARNING: JET OVERFLOW IMMINENT',
'VISCM DETECTOR DAMAGED',
],
5: [
'PLOT CRIME RUNS RAMPANT',
'REQUESTING IMMEDIATE BACKUP',
"SUSPECTED COORDINATED JET ATTACK",
"WARNING: JET OVERFLOW IMMINENT",
"VISCM DETECTOR DAMAGED",
],
5: ["PLOT CRIME RUNS RAMPANT", "REQUESTING IMMEDIATE BACKUP",],
np.inf: [
'THERE ARE TOO MANY OF THEM',
'EXTREME JET CONTAMINATION, SEEK SHELTER',
'DECLARING GLOBAL STATE OF PLOT EMERGENCY',
"THERE ARE TOO MANY OF THEM",
"EXTREME JET CONTAMINATION, SEEK SHELTER",
"DECLARING GLOBAL STATE OF PLOT EMERGENCY",
],
}
# + Collapsed="false"
# Limit the number of preprints to 14, so that we don't exceed the max tweet length.
status_report = jinja2.Template("""arXiv/cond-mat STATUS REPORT {{date.strftime('%d/%m/%Y')}}
status_report = jinja2.Template(
"""arXiv/cond-mat STATUS REPORT {{date.strftime('%d/%m/%Y')}}
🚨🚨 JET DETECTED IN {{ jet_papers | length }} / {{ todays_ids | length }} PREPRINTS 🚨🚨{{ ': ' if jet_papers else '.' }}
{{- paper_links }}
{{ description }}
""")
"""
)
# + Collapsed="false"
def prepare_tweet(todays_ids, jet_papers):
status_options = next(
options
for number, options in evaluations.items()
if number >= len(jet_papers)
options for number, options in evaluations.items() if number >= len(jet_papers)
)
description = random.choice(status_options)
# Compute how much space we have remaining
bare_length = len(
status_report.render(date=date.today(), jet_papers=jet_papers, description=description,
todays_ids=todays_ids)
status_report.render(
date=date.today(),
jet_papers=jet_papers,
description=description,
todays_ids=todays_ids,
)
)
remaining_chars = MAX_TWEET_LENGTH - bare_length
separator = ', '
remaining_links = (remaining_chars + len(separator)) // (URL_LENGTH + len(separator))
separator = ", "
remaining_links = (remaining_chars + len(separator)) // (
URL_LENGTH + len(separator)
)
if len(jet_papers) <= max(4, remaining_links):
# We may link to all papers
paper_links = separator.join(pdf_url(paper_id) for paper_id in jet_papers)
......@@ -368,24 +361,19 @@ def prepare_tweet(todays_ids, jet_papers):
# We just list the preprint ids.
remaining_papers = (remaining_chars + len(separator)) // (9 + len(separator))
if len(jet_papers) > remaining_papers:
paper_links = separator.join(jet_papers[: remaining_papers - 1] + ['…'])
paper_links = separator.join(jet_papers[: remaining_papers - 1] + ["…"])
else:
paper_links = separator.join(jet_papers[:remaining_papers])
return status_report.render(
date=date.today(), jet_papers=jet_papers, description=description, paper_links=paper_links,
todays_ids=todays_ids
date=date.today(),
jet_papers=jet_papers,
description=description,
paper_links=paper_links,
todays_ids=todays_ids,
)
# + Collapsed="false"
# # Testing that it works with all
# for i in range(20):
# for _ in range(10):
# jet_papers = ['1912.07644'] * i
# # We cannot check length due to how URLs are counted
# tweet = prepare_tweet(jet_papers)
# + Collapsed="false"
tweet = prepare_tweet(todays_ids, jet_papers)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment