Commit 9bf5d1d5 authored by Hugo Kerstens's avatar Hugo Kerstens

Implement Celery to handle pdf processing

parent f5addf17
......@@ -3,7 +3,7 @@
"main": "index.js",
"license": "AGPL-3.0",
"scripts": {
"dev": "concurrently --kill-others --names \"WEBPACK,PYTHON\" --prefix-colors \"bgBlue.bold,bgGreen.bold\" \"webpack-dev-server --hot --inline --progress --config webpack.dev.js\" \"ZESJE_SETTINGS=$(pwd)/zesje.dev.cfg python3 zesje\"",
"dev": "concurrently --kill-others --names \"WEBPACK,PYTHON,CELERY\" --prefix-colors \"bgBlue.bold,bgGreen.bold,bgRed.bold\" \"webpack-dev-server --hot --inline --progress --config webpack.dev.js\" \"ZESJE_SETTINGS=$(pwd)/zesje.dev.cfg python3 zesje\" \"ZESJE_SETTINGS=$(pwd)/zesje.dev.cfg celery -A zesje.celery worker\"",
"build": "webpack --config webpack.prod.js",
"ci": "yarn lint; yarn test",
"lint": "yarn lint:js; yarn lint:py",
......
......@@ -6,6 +6,8 @@ sqlalchemy
Flask-Migrate
alembic
pyyaml
celery
redis
# General utilities
numpy
......
""" Init file that starts a Flask dev server and opens db """
import os
from os.path import abspath, dirname
from flask import Flask
from flask_migrate import Migrate
from werkzeug.exceptions import NotFound
from .factory import create_app, make_celery
from .api import api_bp
from .database import db
from ._version import __version__
__all__ = ['__version__', 'app']
STATIC_FOLDER_PATH = os.path.join(abspath(dirname(__file__)), 'static')
app = create_app()
app = Flask(__name__, static_folder=STATIC_FOLDER_PATH)
app.register_blueprint(api_bp, url_prefix='/api')
if 'ZESJE_SETTINGS' in os.environ:
app.config.from_envvar('ZESJE_SETTINGS')
# Default settings
app.config.update(
DATA_DIRECTORY=abspath(app.config.get('DATA_DIRECTORY', 'data')),
)
# These reference DATA_DIRECTORY, so they need to be in a separate update
app.config.update(
SCAN_DIRECTORY=os.path.join(app.config['DATA_DIRECTORY'], 'scans'),
DB_PATH=os.path.join(app.config['DATA_DIRECTORY'], 'course.sqlite'),
)
app.config.update(
SQLALCHEMY_DATABASE_URI='sqlite:///' + app.config['DB_PATH'],
SQLALCHEMY_TRACK_MODIFICATIONS=False # Suppress future deprecation warning
)
os.makedirs(app.config['DATA_DIRECTORY'], exist_ok=True)
os.makedirs(app.config['SCAN_DIRECTORY'], exist_ok=True)
db.init_app(app)
migrate = Migrate(app, db)
celery = make_celery(app)
@app.route('/')
......
import multiprocessing
import os
from flask import current_app as app
......@@ -86,13 +85,7 @@ class Scans(Resource):
# TODO: save these into a process-local datastructure, or save
# it into the DB as well so that we can cull 'processing' tasks
# that are actually dead.
# Because sharing a database connection with a subprocess is dangerous,
# we use the slower "spawn" method that fires up a new process instead
# of forking.
kwargs = {'scan_id': scan.id, 'app_config': app.config}
ctx = multiprocessing.get_context('spawn')
ctx.Process(target=process_pdf, kwargs=kwargs).start()
process_pdf.delay(scan_id=scan.id)
return {
'id': scan.id,
......
import os
from os.path import abspath, dirname
from flask import Flask
from celery import Celery
from .database import db
from flask_migrate import Migrate
STATIC_FOLDER_PATH = os.path.join(abspath(dirname(__file__)), 'static')
def create_app():
app = Flask(__name__, static_folder=STATIC_FOLDER_PATH)
if 'ZESJE_SETTINGS' in os.environ:
app.config.from_envvar('ZESJE_SETTINGS')
# Default settings
app.config.update(
DATA_DIRECTORY=abspath(app.config.get('DATA_DIRECTORY', 'data')),
)
# These reference DATA_DIRECTORY, so they need to be in a separate update
app.config.update(
SCAN_DIRECTORY=os.path.join(app.config['DATA_DIRECTORY'], 'scans'),
DB_PATH=os.path.join(app.config['DATA_DIRECTORY'], 'course.sqlite'),
)
app.config.update(
SQLALCHEMY_DATABASE_URI='sqlite:///' + app.config['DB_PATH'],
SQLALCHEMY_TRACK_MODIFICATIONS=False # Suppress future deprecation warning
)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
db.init_app(app)
Migrate(app, db)
return app
def make_celery(app=None):
if app is None:
app = create_app()
celery = Celery(app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
# Custom task class to ensure Celery tasks are run in the Flask app context
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
......@@ -11,18 +11,21 @@ import numpy as np
import PyPDF2
from PIL import Image
from pylibdmtx import pylibdmtx
from flask import Flask
from .database import db, Scan, Exam, Page, Student, Submission, Solution, ExamWidget
from .datamatrix import decode_raw_datamatrix
from .images import guess_dpi, get_box
from .factory import make_celery
ExtractedBarcode = namedtuple('ExtractedBarcode', ['token', 'copy', 'page'])
ExamMetadata = namedtuple('ExamMetadata', ['token', 'barcode_coords'])
celery = make_celery()
@celery.task()
def process_pdf(scan_id, app_config=None):
"""Process a PDF, recording progress to a database
......@@ -37,6 +40,10 @@ def process_pdf(scan_id, app_config=None):
def raise_exit(signo, frame):
raise SystemExit('PDF processing was killed by an external signal')
if app_config is None:
from flask import current_app
app_config = current_app.config
# We want to trigger an exit from within Python when a signal is received.
# The default behaviour for SIGTERM is to terminate the process without
# calling 'atexit' handlers, and SIGINT raises keyboard interrupt.
......@@ -59,55 +66,48 @@ def _process_pdf(scan_id, app_config):
report_progress = functools.partial(write_pdf_status, scan_id, 'processing')
report_success = functools.partial(write_pdf_status, scan_id, 'success')
app = Flask(__name__)
app.config.update(app_config)
db.init_app(app)
with app.app_context():
db.create_all()
# Raises exception if zero or more than one scans found
scan = Scan.query.filter(Scan.id == scan_id).one()
pdf_path = os.path.join(data_directory, 'scans', f'{scan.id}.pdf')
output_directory = os.path.join(data_directory, f'{scan.exam.id}_data')
try:
exam_config = exam_metadata(scan.exam.id)
except Exception as e:
report_error(f'Error while reading Exam metadata: {e}')
raise
total = PyPDF2.PdfFileReader(open(pdf_path, "rb")).getNumPages()
failures = []
try:
for image, page in extract_images(pdf_path):
report_progress(f'Processing page {page} / {total}')
try:
success, description = process_page(
image, exam_config, output_directory
)
if not success:
print(description)
failures.append(page)
except Exception as e:
report_error(f'Error processing page {page}: {e}')
return
except Exception as e:
report_error(f"Failed to read pdf: {e}")
raise
if failures:
processed = total - len(failures)
if processed:
report_error(
f'Processed {processed} / {total} pages. '
f'Failed on pages: {failures}'
# Raises exception if zero or more than one scans found
scan = Scan.query.filter(Scan.id == scan_id).one()
pdf_path = os.path.join(data_directory, 'scans', f'{scan.id}.pdf')
output_directory = os.path.join(data_directory, f'{scan.exam.id}_data')
try:
exam_config = exam_metadata(scan.exam.id)
except Exception as e:
report_error(f'Error while reading Exam metadata: {e}')
raise
total = PyPDF2.PdfFileReader(open(pdf_path, "rb")).getNumPages()
failures = []
try:
for image, page in extract_images(pdf_path):
report_progress(f'Processing page {page} / {total}')
try:
success, description = process_page(
image, exam_config, output_directory
)
else:
report_error(f'Failed on all {total} pages.')
if not success:
print(description)
failures.append(page)
except Exception as e:
report_error(f'Error processing page {page}: {e}')
return
except Exception as e:
report_error(f"Failed to read pdf: {e}")
raise
if failures:
processed = total - len(failures)
if processed:
report_error(
f'Processed {processed} / {total} pages. '
f'Failed on pages: {failures}'
)
else:
report_success(f'Processed {total} pages.')
report_error(f'Failed on all {total} pages.')
else:
report_success(f'Processed {total} pages.')
def exam_metadata(exam_id):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment