Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • zesje/zesje
  • jbweston/grader_app
  • dj2k/zesje
  • MrHug/zesje
  • okaaij/zesje
  • tsoud/zesje
  • pimotte/zesje
  • works-on-my-machine/zesje
  • labay11/zesje
  • reouvenassouly/zesje
  • t.v.aerts/zesje
  • giuseppe.deininger/zesje
12 results
Show changes
File added
import pytest
from flask import Flask
from zesje.database import db, Exam, _generate_exam_token
from zesje.database import db, _generate_exam_token, Exam, Problem, ProblemWidget, Solution
from zesje.database import Submission, Scan, Page, ExamWidget, FeedbackOption
@pytest.mark.parametrize('duplicate_count', [
......@@ -32,3 +33,141 @@ def test_exam_generate_token_length_uppercase(duplicate_count, monkeypatch):
id = _generate_exam_token()
assert len(id) == 12
assert id.isupper()
def test_cascades_exam(empty_app, exam, problem, submission, scan, exam_widget):
"""Tests the cascades defined for an exam
Tests the cascades for the following relations:
- Exam -> Submission
- Exam -> Problem
- Exam -> Scan
- Exam -> ExamWidget
"""
empty_app.app_context().push()
exam.problems = [problem]
exam.scans = [scan]
exam.submissions = [submission]
exam.widgets = [exam_widget]
db.session.add(exam)
db.session.commit()
assert problem in db.session
assert submission in db.session
assert scan in db.session
assert exam_widget in db.session
db.session.delete(exam)
db.session.commit()
assert problem not in db.session
assert submission not in db.session
assert scan not in db.session
assert exam_widget not in db.session
def test_cascades_problem(empty_app, exam, problem, submission, solution, problem_widget, feedback_option):
"""Tests the cascades defined for a problem
Tests the cascades for the following relations:
- Problem -> Solution
- Problem -> ProblemWidget
- Problem -> FeedbackOption
"""
empty_app.app_context().push()
exam.problems = [problem]
exam.submissions = [submission]
solution.submission = submission
problem.widget = problem_widget
problem.solutions = [solution]
problem.feedback_options = [feedback_option]
db.session.add_all([exam, problem, submission])
db.session.commit()
assert solution in db.session
assert problem_widget in db.session
assert feedback_option in db.session
db.session.delete(problem)
db.session.commit()
assert solution not in db.session
assert problem_widget not in db.session
assert feedback_option not in db.session
def test_cascades_submission(empty_app, exam, problem, submission, solution, page):
"""Tests the cascades defined for a submission
Tests the cascades for the following relations:
- Submission -> Solution
- Submission -> Page
"""
empty_app.app_context().push()
exam.problems = [problem]
exam.submissions = [submission]
solution.problem = problem
solution.submission = submission
page.submission = submission
db.session.add_all([exam, problem, submission])
db.session.commit()
assert solution in db.session
assert page in db.session
db.session.delete(submission)
db.session.commit()
assert solution not in db.session
assert page not in db.session
@pytest.fixture
def exam():
return Exam(name='')
@pytest.fixture
def problem():
return Problem(name='')
@pytest.fixture
def problem_widget():
return ProblemWidget(name='', page=0, x=0, y=0, width=0, height=0)
@pytest.fixture
def exam_widget():
return ExamWidget(name='', x=0, y=0)
@pytest.fixture
def submission():
return Submission(copy_number=0)
@pytest.fixture
def solution():
return Solution()
@pytest.fixture
def scan():
return Scan(name='', status='')
@pytest.fixture
def page():
return Page(path='', number=0)
@pytest.fixture
def feedback_option():
return FeedbackOption(text='')
......@@ -3,8 +3,6 @@ import pytest
from PIL import Image
import numpy as np
from zesje import pregrader
from zesje import scans
from zesje import images
directory_name = "checkboxes"
......@@ -17,34 +15,27 @@ def scanned_image(datadir):
return image
@pytest.fixture
def scanned_image_keypoints(scanned_image):
corner_markers = scans.find_corner_marker_keypoints(scanned_image)
fixed_corner_keypoints = images.fix_corner_markers(corner_markers, scanned_image.shape)
return fixed_corner_keypoints
@pytest.mark.parametrize('box_coords, result', [((346, 479), True), ((370, 479), False), ((393, 479), True),
((416, 479), True), ((439, 479), True), ((155, 562), True)],
ids=["1 filled", "2 empty", "3 marked with line", "4 completely filled",
"5 marked with an x", "e marked with a cirle inside"])
def test_ideal_crops(box_coords, result, scanned_image_keypoints, scanned_image):
assert pregrader.box_is_filled(box_coords, scanned_image, scanned_image_keypoints[0]) == result
def test_ideal_crops(box_coords, result, scanned_image):
assert pregrader.box_is_filled(box_coords, scanned_image, cut_padding=0.1, box_size=9) == result
@pytest.mark.parametrize('box_coords, result', [((341, 471), True), ((352, 482), True), ((448, 482), True),
((423, 474), True), ((460, 475), False), ((477, 474), True),
((87, 544), False)],
((87, 556), False)],
ids=["1 filled bottom right", "1 filled top left", "5 filled with a bit of 6",
"4 fully filled with the label", "6 empty with label",
"7 partially cropped, filled and a part of 6", "B empty with cb at the bottom"])
def test_shifted_crops(box_coords, result, scanned_image_keypoints, scanned_image):
assert pregrader.box_is_filled(box_coords, scanned_image, scanned_image_keypoints[0]) == result
def test_shifted_crops(box_coords, result, scanned_image):
assert pregrader.box_is_filled(box_coords, scanned_image, cut_padding=0.1, box_size=9) == result
@pytest.mark.parametrize('box_coords, result', [((60, 562), True), ((107, 562), True),
((131, 562), False)],
ids=["A filled with trailing letter", "C filled with letters close",
"D blank with trailing letter"])
def test_trailing_text(box_coords, result, scanned_image_keypoints, scanned_image):
assert pregrader.box_is_filled(box_coords, scanned_image, scanned_image_keypoints[0]) == result
def test_trailing_text(box_coords, result, scanned_image):
assert pregrader.box_is_filled(box_coords, scanned_image, cut_padding=0.1, box_size=9) == result
......@@ -8,6 +8,7 @@ from tempfile import NamedTemporaryFile
from flask import Flask
from io import BytesIO
import wand.image
from pikepdf import Pdf
from zesje.scans import decode_barcode, ExamMetadata, ExtractedBarcode
from zesje.database import db, _generate_exam_token
......@@ -277,6 +278,24 @@ def test_all_effects(
assert success is expected, reason
@pytest.mark.parametrize('filename,expected', [
['blank-a4-2pages.pdf', AttributeError],
['single-image-a4.pdf', ValueError],
['two-images-a4.pdf', ValueError],
['flattened-a4-2pages.pdf', None]],
ids=['blank pdf', 'single image', 'two images', 'flattened pdf'])
def test_image_extraction_pike(datadir, filename, expected):
file = os.path.join(datadir, filename)
with Pdf.open(file) as pdf_reader:
for pagenr in range(len(pdf_reader.pages)):
if expected is not None:
with pytest.raises(expected):
scans.extract_image_pikepdf(pagenr, pdf_reader)
else:
img = scans.extract_image_pikepdf(pagenr, pdf_reader)
assert img is not None
@pytest.mark.parametrize('filename', [
'blank-a4-2pages.pdf',
'flattened-a4-2pages.pdf'],
......
......@@ -9,9 +9,9 @@ from flask_restful import Resource, reqparse
from werkzeug.datastructures import FileStorage
from sqlalchemy.orm import selectinload
from ..pdf_generation import generate_pdfs, output_pdf_filename_format, join_pdfs, page_is_size, make_pages_even
from ..pdf_generation import PAGE_FORMATS
from ..database import db, Exam, ExamWidget, Submission
from ..pdf_generation import generate_pdfs, output_pdf_filename_format, join_pdfs
from ..pdf_generation import page_is_size, make_pages_even, PAGE_FORMATS
from ..database import db, Exam, ExamWidget, Submission, token_length
def _get_exam_dir(exam_id):
......@@ -65,19 +65,7 @@ class Exams(Resource):
elif Submission.query.filter(Submission.exam_id == exam.id).count():
return dict(status=500, message='Exam is not finalized but already has submissions.'), 500
else:
# Delete any scans that were wrongly uploaded to this exam
for scan in exam.scans:
db.session.delete(scan)
for widget in exam.widgets:
db.session.delete(widget)
for problem in exam.problems:
for fb_option in problem.feedback_options:
db.session.delete(fb_option)
db.session.delete(problem.widget)
db.session.delete(problem)
# All corresponding solutions, scans and problems are automatically deleted
db.session.delete(exam)
db.session.commit()
......@@ -531,8 +519,8 @@ class ExamPreview(Resource):
cb_data = get_cb_data_for_exam(exam)
generate_pdfs(
exam_path,
exam.token[:5] + 'PREVIEW',
[1519],
"A" * token_length,
[1559],
[output_file],
student_id_widget.x, student_id_widget.y,
barcode_widget.x, barcode_widget.y,
......
......@@ -3,8 +3,9 @@ from flask import abort, Response
import numpy as np
import cv2
from ..images import get_box
from ..database import Exam, Submission, Problem, Page
from ..images import get_box, guess_dpi
from ..database import Exam, Submission, Problem, Page, Solution
from ..pdf_generation import CHECKBOX_FORMAT
def get(exam_id, problem_id, submission_id, full_page=False):
......@@ -56,6 +57,23 @@ def get(exam_id, problem_id, submission_id, full_page=False):
page_path = page.path
page_im = cv2.imread(page_path)
# pregrade highliting
solution = Solution.query.filter(Solution.submission_id == sub.id,
Solution.problem_id == problem_id).one_or_none()
if solution is not None:
dpi = guess_dpi(page_im)
fb = list(map(lambda x: x.id, solution.feedback))
for option in problem.mc_options:
if option.feedback_id in fb:
x = int(option.x / 72 * dpi)
y = int(option.y / 72 * dpi)
box_length = int(CHECKBOX_FORMAT["box_size"] / 72 * dpi)
x1 = x + box_length
y1 = y + box_length
page_im = cv2.rectangle(page_im, (x, y), (x1, y1), (0, 255, 0), 3)
if not full_page:
raw_image = get_box(page_im, widget_area_in, padding=0.3)
else:
......
""" REST api for problems """
from flask_restful import Resource, reqparse
from flask_restful import Resource, reqparse, current_app
from ..database import db, Exam, Problem, ProblemWidget, Solution
from zesje.pdf_reader import get_problem_title
class Problems(Resource):
""" List of problems associated with a particular exam_id """
......@@ -58,11 +60,18 @@ class Problems(Resource):
db.session.commit()
widget.name = f'problem_{problem.id}'
app_config = current_app.config
data_dir = app_config.get('DATA_DIRECTORY', 'data')
page_format = app_config.get('PAGE_FORMAT', 'A4')
problem.name = get_problem_title(problem, data_dir, page_format)
db.session.commit()
return {
'id': problem.id,
'widget_id': widget.id,
'problem_name': problem.name
}
put_parser = reqparse.RequestParser()
......@@ -105,13 +114,11 @@ class Problems(Resource):
if any([sol.graded_by is not None for sol in problem.solutions]):
return dict(status=403, message=f'Problem has already been graded'), 403
else:
# Delete all solutions associated with this problem
for sol in problem.solutions:
db.session.delete(sol)
# Delete all multiple choice options associated with this problem
# delete mc options
for mc_option in problem.mc_options:
db.session.delete(mc_option)
db.session.delete(problem.widget)
# The widget and all associated solutions are automatically deleted
db.session.delete(problem)
db.session.commit()
return dict(status=200, message="ok"), 200
......@@ -186,4 +186,6 @@ class Approve(Resource):
solution.graded_at = datetime.now()
solution.graded_by = grader
db.session.commit()
return {'state': graded}
......@@ -63,10 +63,11 @@ class Exam(db.Model):
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(Text, nullable=False)
token = Column(String(token_length), unique=True, default=_generate_exam_token)
submissions = db.relationship('Submission', backref='exam', lazy=True)
problems = db.relationship('Problem', backref='exam', order_by='Problem.id', lazy=True)
scans = db.relationship('Scan', backref='exam', lazy=True)
widgets = db.relationship('ExamWidget', backref='exam', order_by='ExamWidget.id', lazy=True)
submissions = db.relationship('Submission', backref='exam', cascade='all', lazy=True)
problems = db.relationship('Problem', backref='exam', cascade='all', order_by='Problem.id', lazy=True)
scans = db.relationship('Scan', backref='exam', cascade='all', lazy=True)
widgets = db.relationship('ExamWidget', backref='exam', cascade='all',
order_by='ExamWidget.id', lazy=True)
finalized = Column(Boolean, default=False, server_default='f')
......@@ -76,8 +77,9 @@ class Submission(db.Model):
id = Column(Integer, primary_key=True, autoincrement=True)
copy_number = Column(Integer, nullable=False)
exam_id = Column(Integer, ForeignKey('exam.id'), nullable=False)
solutions = db.relationship('Solution', backref='submission', order_by='Solution.problem_id', lazy=True)
pages = db.relationship('Page', backref='submission', lazy=True)
solutions = db.relationship('Solution', backref='submission', cascade='all',
order_by='Solution.problem_id', lazy=True)
pages = db.relationship('Page', backref='submission', cascade='all', lazy=True)
student_id = Column(Integer, ForeignKey('student.id'), nullable=True)
signature_validated = Column(Boolean, default=False, server_default='f', nullable=False)
......@@ -97,9 +99,10 @@ class Problem(db.Model):
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(Text, nullable=False)
exam_id = Column(Integer, ForeignKey('exam.id'), nullable=False)
feedback_options = db.relationship('FeedbackOption', backref='problem', order_by='FeedbackOption.id', lazy=True)
solutions = db.relationship('Solution', backref='problem', lazy=True)
widget = db.relationship('ProblemWidget', backref='problem', uselist=False, lazy=True)
feedback_options = db.relationship('FeedbackOption', backref='problem', cascade='all',
order_by='FeedbackOption.id', lazy=True)
solutions = db.relationship('Solution', backref='problem', cascade='all', lazy=True)
widget = db.relationship('ProblemWidget', backref='problem', cascade='all', uselist=False, lazy=True)
@hybrid_property
def mc_options(self):
......
......@@ -8,10 +8,12 @@ from email.mime.base import MIMEBase
from email import encoders
import jinja2
from wand.image import Image
from reportlab.pdfgen import canvas
from .database import Submission
from . import statistics
from .api.exams import PAGE_FORMATS
def solution_pdf(exam_id, student_id):
......@@ -20,17 +22,17 @@ def solution_pdf(exam_id, student_id):
pages = sorted((p for s in subs for p in s.pages), key=(lambda p: p.number))
pages = [p.path for p in pages]
with Image() as output_pdf:
for filepath in pages:
with Image(filename=filepath) as page:
output_pdf.sequence.append(page)
output_pdf.format = 'pdf'
result = BytesIO()
from flask import current_app
page_format = current_app.config.get('PAGE_FORMAT', 'A4') # TODO Remove default value
page_size = PAGE_FORMATS[page_format]
output_pdf.save(file=result)
result = BytesIO()
pdf = canvas.Canvas(result, pagesize=page_size)
for page in pages:
pdf.drawImage(page, 0, 0, width=page_size[0], height=page_size[1])
pdf.showPage()
pdf.save()
result.seek(0)
return result
......
......@@ -33,8 +33,8 @@ def create_app():
)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
CELERY_BROKER_URL='redis://localhost:6479',
CELERY_RESULT_BACKEND='redis://localhost:6479'
)
db.init_app(app)
......
......@@ -7,7 +7,7 @@ from operator import sub, add
def guess_dpi(image_array):
h, *_ = image_array.shape
resolutions = np.array([1200, 600, 400, 300, 200, 150, 120, 100, 75, 72, 60, 50, 40])
resolutions = np.array([1200, 600, 400, 300, 200, 150, 144, 120, 100, 75, 72, 60, 50, 40])
return resolutions[np.argmin(abs(resolutions - 25.4 * h / 297))]
......@@ -60,7 +60,7 @@ def fix_corner_markers(corner_keypoints, shape):
return corner_keypoints
if len(corner_keypoints) < 3:
raise RuntimeError("Fewer then 3 corner markers found")
raise RuntimeError("Fewer than 3 corner markers found while trying to fix corners")
x_sep = shape[1] / 2
y_sep = shape[0] / 2
......
from io import BytesIO
from tempfile import NamedTemporaryFile
import PIL
from pdfrw import PdfReader, PdfWriter, PageMerge
from pystrich.datamatrix import DataMatrixEncoder
from pylibdmtx.pylibdmtx import encode
from reportlab.lib.units import mm
from reportlab.pdfgen import canvas
......@@ -18,6 +17,12 @@ MARKER_FORMAT = {
"bar_length": 40 * mm
}
# the parameters of drawing checkboxes
CHECKBOX_FORMAT = {
"margin": 5,
"font_size": 11,
"box_size": 9
}
PAGE_FORMATS = {
"A4": (595.276, 841.89),
"US letter": (612, 792),
......@@ -29,9 +34,9 @@ def generate_pdfs(exam_pdf_file, exam_id, copy_nums, output_paths, id_grid_x,
"""
Generate the final PDFs from the original exam PDF.
To maintain a consistent size of the DataMatrix codes, adhere to (# of
letters in exam ID) + 2 * (# of digits in exam ID) = C for a certain
constant C. The reason for this is that pyStrich encodes two digits in as
To ensure the page information fits into the datamatrix grid, adhere to
(# of letters in exam ID) + 2 * (# of digits in exam ID) = C for a certain
constant C. The reason for this is that libdmtx encodes two digits in as
much space as one letter.
If maximum interchangeability with version 1 QR codes is desired (error
......@@ -182,29 +187,26 @@ def generate_checkbox(canvas, x, y, label):
A string representing the label that is drawn on top of the box, will only take the first character
"""
fontsize = 11 # Size of font
margin = 5 # Margin between elements and sides
markboxsize = fontsize - 2 # Size of checkboxes boxes
x_label = x + 1 # location of the label
y_label = y + margin # remove fontsize from the y label since we draw from the bottom left up
box_y = y - markboxsize # remove the markboxsize because the y is the coord of the top
y_label = y + CHECKBOX_FORMAT["margin"] # remove fontsize from the y label since we draw from the bottom left up
box_y = y - CHECKBOX_FORMAT["box_size"] # remove the markboxsize because the y is the coord of the top
# and reportlab prints from the bottom
# check that there is a label to print
if (label and not (len(label) == 0)):
canvas.setFont('Helvetica', fontsize)
canvas.setFont('Helvetica', CHECKBOX_FORMAT["font_size"])
canvas.drawString(x_label, y_label, label[0])
canvas.rect(x, box_y, markboxsize, markboxsize)
canvas.rect(x, box_y, CHECKBOX_FORMAT["box_size"], CHECKBOX_FORMAT["box_size"])
def generate_datamatrix(exam_id, page_num, copy_num):
"""
Generates a DataMatrix code to be used on a page.
To maintain a consistent size of the DataMatrix codes, adhere to (# of
letters in exam ID) + 2 * (# of digits in exam ID) = C for a certain
constant C. The reason for this is that pyStrich encodes two digits in as
To ensure the page information fits into the datamatrix grid, adhere to
(# of letters in exam ID) + 2 * (# of digits in exam ID) = C for a certain
constant C. The reason for this is that pylibdmtx encodes two digits in as
much space as one letter.
If maximum interchangeability with version 1 QR codes is desired (error
......@@ -229,8 +231,10 @@ def generate_datamatrix(exam_id, page_num, copy_num):
data = f'{exam_id}/{copy_num:04d}/{page_num:02d}'
image_bytes = DataMatrixEncoder(data).get_imagedata(cellsize=2)
return PIL.Image.open(BytesIO(image_bytes))
encoded = encode(data.encode('utf-8'), size='18x18')
datamatrix = PIL.Image.frombytes('RGB', (encoded.width, encoded.height), encoded.pixels)
datamatrix = datamatrix.resize((44, 44)).convert('L')
return datamatrix
def _generate_overlay(canv, pagesize, exam_id, copy_num, num_pages, id_grid_x,
......@@ -239,9 +243,9 @@ def _generate_overlay(canv, pagesize, exam_id, copy_num, num_pages, id_grid_x,
Generates an overlay ('watermark') PDF, which can then be overlaid onto
the exam PDF.
To maintain a consistent size of the DataMatrix codes in the overlay,
To ensure the page information fits into the datamatrix grid in the overlay,
adhere to (# of letters in exam ID) + 2 * (# of digits in exam ID) = C for
a certain constant C. The reason for this is that pyStrich encodes two
a certain constant C. The reason for this is that pylibdmtx encodes two
digits in as much space as one letter.
If maximum interchangeability with version 1 QR codes is desired (error
......@@ -273,10 +277,6 @@ def _generate_overlay(canv, pagesize, exam_id, copy_num, num_pages, id_grid_x,
"""
# Font settings for the copy number (printed under the datamatrix)
fontsize = 8
canv.setFont('Helvetica', fontsize)
# transform y-cooridate to different origin location
id_grid_y = pagesize[1] - id_grid_y
......@@ -293,6 +293,9 @@ def _generate_overlay(canv, pagesize, exam_id, copy_num, num_pages, id_grid_x,
else:
index = 0
max_index = 0
# Font settings for the copy number (printed under the datamatrix)
fontsize = 12
canv.setFont('Helvetica', fontsize)
for page_num in range(num_pages):
_add_corner_markers_and_bottom_bar(canv, pagesize)
......@@ -304,7 +307,7 @@ def _generate_overlay(canv, pagesize, exam_id, copy_num, num_pages, id_grid_x,
canv.drawInlineImage(datamatrix, datamatrix_x, datamatrix_y_adjusted)
canv.drawString(
datamatrix_x, datamatrix_y_adjusted - fontsize,
datamatrix_x, datamatrix_y_adjusted - (fontsize * 0.66),
f" # {copy_num}"
)
......
import os
from pdfminer3.converter import PDFPageAggregator
from pdfminer3.layout import LAParams
from pdfminer3.layout import LTFigure
from pdfminer3.layout import LTTextBoxHorizontal
from pdfminer3.pdfdocument import PDFDocument
from pdfminer3.pdfinterp import PDFResourceManager
from pdfminer3.pdfinterp import PDFPageInterpreter
from pdfminer3.pdfpage import PDFPage
from pdfminer3.pdfparser import PDFParser
from .api.exams import PAGE_FORMATS
def get_problem_title(problem, data_dir, page_format):
"""
Returns the title of a problem
Parameters
----------
data_dir : str
Location of the data folder
page_format : str
Format of the current page
problem : Problem
The currently selected problem
Returns
-------
title: str
The title of the problem, or an empty string if no text is found
"""
pdf_path = os.path.join(data_dir, f'{problem.exam_id}_data', 'exam.pdf')
fp = open(pdf_path, 'rb')
parser = PDFParser(fp)
document = PDFDocument(parser)
rsrcmgr = PDFResourceManager()
laparams = LAParams()
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
interpreter = PDFPageInterpreter(rsrcmgr, device)
# Get the other problems on the same page
problems_on_page = [p for p in problem.exam.problems if p.widget.page == problem.widget.page]
problems_on_page.sort(key=lambda prob: prob.widget.y)
idx = problems_on_page.index(problem)
# Determine y coordinates to search for text
if idx == 0:
y_above = 0
else:
problem_above = problems_on_page[idx - 1]
y_above = problem_above.widget.y + problem_above.widget.height
y_current = problem.widget.y + problem.widget.height
for page in PDFPage.create_pages(document):
interpreter.process_page(page)
layout = device.get_result()
if layout.pageid == problem.widget.page + 1:
filtered_words = get_words(layout._objs, y_above, y_current, page_format)
if not filtered_words:
return ''
lines = filtered_words[0].split('\n')
return lines[0]
return ''
def get_words(layout_objs, y_top, y_bottom, page_format):
"""
Returns the text from a pdf page within a specified height.
Pdfminer orients the coordinates of a layout object from
the bottom left.
Adapted from https://github.com/euske/pdfminer/issues/171
obj.bbox returns the following values: (x0, y0, x1, y1)
With
x0: the distance from the left of the page to the left edge of the box.
y0: the distance from the bottom of the page to the lower edge of the box.
x1: the distance from the left of the page to the right edge of the box.
y1: the distance from the bottom of the page to the upper edge of the box.
Parameters
----------
page_format : str
Format of the current page
layout_objs : list of layout objects
The list of objects in the page.
y_top : double
Highest top coordinate of each word
y_bottom : double
Lowest bottom coordinate of each word
Returns
-------
words : list of tuples
A list of tuples with the (y, text) values.
"""
page_height = PAGE_FORMATS[page_format][1]
words = []
for obj in layout_objs:
if isinstance(obj, LTTextBoxHorizontal):
if page_height - y_top > obj.bbox[1] > page_height - y_bottom:
words.append(obj.get_text())
elif isinstance(obj, LTFigure):
words.append(get_words(obj._objs, y_top, y_bottom, page_format))
return words
......@@ -2,10 +2,11 @@ import cv2
import numpy as np
from .database import db, Solution
from .images import guess_dpi, get_box, fix_corner_markers
from .images import guess_dpi, get_box
from .pdf_generation import CHECKBOX_FORMAT
def add_feedback_to_solution(sub, exam, page, page_img, corner_keypoints):
def add_feedback_to_solution(sub, exam, page, page_img):
"""
Adds the multiple choice options that are identified as marked as a feedback option to a solution
......@@ -17,30 +18,22 @@ def add_feedback_to_solution(sub, exam, page, page_img, corner_keypoints):
the current exam
page_img : Image
image of the page
corner_keypoints : array
locations of the corner keypoints as (x, y) tuples
"""
problems_on_page = [problem for problem in exam.problems if problem.widget.page == page]
fixed_corner_keypoints = fix_corner_markers(corner_keypoints, page_img.shape)
x_min = min(point[0] for point in fixed_corner_keypoints)
y_min = min(point[1] for point in fixed_corner_keypoints)
top_left_point = (x_min, y_min)
for problem in problems_on_page:
sol = Solution.query.filter(Solution.problem_id == problem.id, Solution.submission_id == sub.id).one_or_none()
for mc_option in problem.mc_options:
box = (mc_option.x, mc_option.y)
if box_is_filled(box, page_img, top_left_point):
if box_is_filled(box, page_img, box_size=CHECKBOX_FORMAT["box_size"]):
feedback = mc_option.feedback
sol.feedback.append(feedback)
db.session.commit()
def box_is_filled(box, page_img, corner_keypoints, marker_margin=72/2.54, threshold=225, cut_padding=0.1, box_size=11):
def box_is_filled(box, page_img, threshold=225, cut_padding=0.05, box_size=9):
"""
A function that finds the checkbox in a general area and then checks if it is filled in.
......@@ -50,12 +43,6 @@ def box_is_filled(box, page_img, corner_keypoints, marker_margin=72/2.54, thresh
The coordinates of the top left (x,y) of the checkbox in points.
page_img: np.array
A numpy array of the image scan
corner_keypoints: (float,float)
The x coordinate of the left markers and the y coordinate of the top markers,
used as point of reference since scans can deviate from the original.
(x,y) are both in pixels.
marker_margin: float
The margin between the corner markers and the edge of a page when generated.
threshold: int
the threshold needed for a checkbox to be considered marked range is between 0 (fully black)
and 255 (absolutely white).
......@@ -69,18 +56,12 @@ def box_is_filled(box, page_img, corner_keypoints, marker_margin=72/2.54, thresh
True if the box is marked, else False.
"""
# shouldn't be needed, but some images are drawn a bit weirdly
y_shift = 11
# create an array with y top, y bottom, x left and x right. use the marker margin to allign to the page.
coords = np.asarray([box[1] - marker_margin + y_shift, box[1] + box_size - marker_margin + y_shift,
box[0] - marker_margin, box[0] + box_size - marker_margin])/72
# create an array with y top, y bottom, x left and x right. And divide by 72 to get dimensions in inches.
coords = np.asarray([box[1], box[1] + box_size,
box[0], box[0] + box_size])/72
# add the actually margin from the scan to corner markers to the coords in inches
dpi = guess_dpi(page_img)
coords[0] = coords[0] + corner_keypoints[1]/dpi
coords[1] = coords[1] + corner_keypoints[1]/dpi
coords[2] = coords[2] + corner_keypoints[0]/dpi
coords[3] = coords[3] + corner_keypoints[0]/dpi
# get the box where we think the box is
cut_im = get_box(page_img, coords, padding=cut_padding)
......@@ -88,7 +69,7 @@ def box_is_filled(box, page_img, corner_keypoints, marker_margin=72/2.54, thresh
# convert to grayscale
gray_im = cv2.cvtColor(cut_im, cv2.COLOR_BGR2GRAY)
# apply threshold to only have black or white
_, bin_im = cv2.threshold(gray_im, 150, 255, cv2.THRESH_BINARY)
_, bin_im = cv2.threshold(gray_im, 160, 255, cv2.THRESH_BINARY)
h_bin, w_bin, *_ = bin_im.shape
# create a mask that gets applied when floodfill the white
......@@ -111,7 +92,6 @@ def box_is_filled(box, page_img, corner_keypoints, marker_margin=72/2.54, thresh
# if the rectangle is bigger (higher) than expected, cut the image up a bit
if h > 1.5 * box_size_px:
print("in h resize")
y_partition = 0.333
# try getting another bounding box on bottom 2/3 of the screen
coords2 = cv2.findNonZero(flood_im[y + int(y_partition * h): y + h, x: x+w])
......
......@@ -4,11 +4,12 @@ import math
import os
from collections import namedtuple, Counter
from io import BytesIO
from tempfile import SpooledTemporaryFile
import signal
import cv2
import numpy as np
import PyPDF2
from pikepdf import Pdf, PdfImage
from PIL import Image
from wand.image import Image as WandImage
from pylibdmtx import pylibdmtx
......@@ -80,7 +81,9 @@ def _process_pdf(scan_id, app_config):
report_error(f'Error while reading Exam metadata: {e}')
raise
total = PyPDF2.PdfFileReader(open(pdf_path, "rb")).getNumPages()
with Pdf.open(pdf_path) as pdf_reader:
total = len(pdf_reader.pages)
failures = []
try:
for image, page in extract_images(pdf_path):
......@@ -131,70 +134,55 @@ def exam_metadata(exam_id):
def extract_images(filename):
"""Yield all images from a PDF file.
Tries to use PyPDF2 to extract the images from the given PDF.
If PyPDF2 fails to open the PDF or PyPDF2 is not able to extract
a page, it continues to use Wand for the rest of the pages.
Tries to use PikePDF to extract the images from the given PDF.
If PikePDF is not able to extract the image from a page,
it continues to use Wand to flatten the rest of the pages.
"""
with open(filename, "rb") as file:
with Pdf.open(filename) as pdf_reader:
use_wand = False
pypdf_reader = None
wand_image = None
total = 0
try:
pypdf_reader = PyPDF2.PdfFileReader(file)
total = pypdf_reader.getNumPages()
except Exception:
# Fallback to Wand if opening the PDF with PyPDF2 failed
use_wand = True
if use_wand:
# If PyPDF2 failed we need Wand to count the number of pages
wand_image = WandImage(filename=filename, resolution=300)
total = len(wand_image.sequence)
total = len(pdf_reader.pages)
for pagenr in range(total):
if not use_wand:
try:
# Try to use PyPDF2, but catch any error it raises
img = extract_image_pypdf(pagenr, pypdf_reader)
# Try to use PikePDF, but catch any error it raises
img = extract_image_pikepdf(pagenr, pdf_reader)
except Exception:
# Fallback to Wand if extracting with PyPDF2 failed
# Fallback to Wand if extracting with PikePDF failed
use_wand = True
if use_wand:
if wand_image is None:
wand_image = WandImage(filename=filename, resolution=300)
img = extract_image_wand(pagenr, wand_image)
img = extract_image_wand(pagenr, pdf_reader)
if img.mode == 'L':
img = img.convert('RGB')
yield img, pagenr+1
if wand_image is not None:
wand_image.close()
def extract_image_pypdf(pagenr, reader):
def extract_image_pikepdf(pagenr, reader):
"""Extracts an image as an array from the designated page
This method uses PyPDF2 to extract the image and only works
when there is a single image present on the page.
This method uses PikePDF to extract the image and only works
when there is a single image present on the page with the
same aspect ratio as the page.
Raises an error if not exactly one image is found on the page
or the image filter is not `FlateDecode`.
We do not check for the actual size of the image on the page,
since this size depends on the draw instruction rather than
the embedded image object available to pikepdf.
Adapted from https://stackoverflow.com/a/34116472/2217463
Raises an error if not exactly image is present or the image
does not have the same aspect ratio as the page.
Parameters
----------
pagenr : int
Page number to extract
reader : PyPDF2.PdfFileReader instance
The reader to read the page from
reader : pikepdf.Pdf instance
The pdf reader to read the page from
Returns
-------
......@@ -203,60 +191,71 @@ def extract_image_pypdf(pagenr, reader):
Raises
------
ValueError if not exactly one image is found on the page
NotImplementedError if the image filter is not `FlateDecode`
ValueError
if not exactly one image is found on the page or the image
does not have the same aspect ratio as the page
AttributeError
if no XObject or MediaBox is present on the page
"""
page = reader.getPage(pagenr)
xObject = page['/Resources']['/XObject'].getObject()
page = reader.pages[pagenr]
xObject = page.Resources.XObject
if sum((xObject[obj]['/Subtype'] == '/Image')
if sum((xObject[obj].Subtype == '/Image')
for obj in xObject) != 1:
raise ValueError
raise ValueError('Not exactly 1 image present on the page')
for obj in xObject:
if xObject[obj]['/Subtype'] == '/Image':
data = xObject[obj].getData()
filter = xObject[obj]['/Filter']
if xObject[obj].Subtype == '/Image':
pdfimage = PdfImage(xObject[obj])
if filter == '/FlateDecode':
size = (xObject[obj]['/Width'], xObject[obj]['/Height'])
if xObject[obj]['/ColorSpace'] == '/DeviceRGB':
mode = "RGB"
else:
mode = "P"
img = Image.frombytes(mode, size, data)
else:
raise NotImplementedError
pdf_width = float(page.MediaBox[2] - page.MediaBox[0])
pdf_height = float(page.MediaBox[3] - page.MediaBox[1])
return img
ratio_width = pdfimage.width / pdf_width
ratio_height = pdfimage.height / pdf_height
# Check if the aspect ratio of the image is the same as the
# aspect ratio of the page up to a 3% relative error
if abs(ratio_width - ratio_height) > 0.03 * ratio_width:
raise ValueError('Image has incorrect dimensions')
def extract_image_wand(pagenr, wand_image):
return pdfimage.as_pil_image()
def extract_image_wand(pagenr, reader):
"""Flattens a page from a PDF to an image array
This method uses Wand to flatten the page and extract the image.
This method uses Wand to flatten the page and creates an image.
Parameters
----------
pagenr : int
Page number to extract, starting at 0
wand_image : Wand Image instance
The Wand Image to read from
reader : pikepdf.Pdf instance
The pdf reader to read the page from
Returns
-------
img_array : PIL Image
The extracted image data
"""
page = reader.pages[pagenr]
page_pdf = Pdf.new()
page_pdf.pages.append(page)
with SpooledTemporaryFile() as page_file:
page_pdf.save(page_file)
with WandImage(blob=page_file._file.getvalue(), format='pdf', resolution=300) as page_image:
page_image.format = 'jpg'
img_array = np.asarray(bytearray(page_image.make_blob(format="jpg")), dtype=np.uint8)
img = Image.open(BytesIO(img_array))
img.load() # Load the data into the PIL image from the Wand image
single_page = WandImage(wand_image.sequence[pagenr])
single_page.format = 'jpg'
img_array = np.asarray(bytearray(single_page.make_blob(format="jpg")), dtype=np.uint8)
img = Image.open(BytesIO(img_array))
img.load() # Load the data into the PIL image from the Wand image
single_page.close() # Then close the Wand image
return img
......@@ -322,14 +321,6 @@ def process_page(image_data, exam_config, output_dir=None, strict=False):
else:
image_array = realign_image(image_array, corner_keypoints)
# get new corner markers of the realigned image
corner_keypoints = find_corner_marker_keypoints(image_array)
try:
check_corner_keypoints(image_array, corner_keypoints)
except RuntimeError as e:
if strict:
return False, str(e)
try:
barcode, upside_down = decode_barcode(image_array, exam_config)
if upside_down:
......@@ -349,7 +340,7 @@ def process_page(image_data, exam_config, output_dir=None, strict=False):
sub, exam = update_database(image_path, barcode)
try:
add_feedback_to_solution(sub, exam, barcode.page, image_array, corner_keypoints)
add_feedback_to_solution(sub, exam, barcode.page, image_array)
except RuntimeError as e:
if strict:
return False, str(e)
......@@ -730,7 +721,7 @@ def realign_image(image_array, keypoints=None,
keypoints = find_corner_marker_keypoints(image_array)
check_corner_keypoints(image_array, keypoints)
if (len(keypoints) != 4):
if(len(keypoints) != 4):
keypoints = fix_corner_markers(keypoints, image_array.shape)
# use standard keypoints if no custom ones are provided
......