Celery uses separate db engine with NullPool. (#1492)

* Celery uses separate db engine with NullPool.

* Address comment
This commit is contained in:
Bogdan
2016-10-31 16:02:12 -07:00
committed by GitHub
parent 4dc959a3e4
commit 4f49cb555b

View File

@@ -1,15 +1,20 @@
import celery
from datetime import datetime
import pandas as pd
import logging
import json
import logging
import pandas as pd
import sqlalchemy
import uuid
import zlib
from sqlalchemy.pool import NullPool
from sqlalchemy.orm import sessionmaker
from caravel import (
app, db, models, utils, dataframe, results_backend)
from caravel.db_engine_specs import LimitMethod
from caravel.jinja_context import process_template
QueryStatus = models.QueryStatus
celery_app = celery.Celery(config_source=app.config.get('CELERY_CONFIG'))
@@ -46,11 +51,18 @@ def create_table_as(sql, table_name, schema=None, override=False):
return exec_sql.format(**locals())
@celery_app.task
def get_sql_results(query_id, return_results=True, store_results=False):
@celery_app.task(bind=True)
def get_sql_results(self, query_id, return_results=True, store_results=False):
"""Executes the sql query returns the results."""
session = db.session()
session.commit() # HACK
if not self.request.called_directly:
engine = sqlalchemy.create_engine(
app.config.get('SQLALCHEMY_DATABASE_URI'), poolclass=NullPool)
session_class = sessionmaker()
session_class.configure(bind=engine)
session = session_class()
else:
session = db.session()
session.commit() # HACK
query = session.query(models.Query).filter_by(id=query_id).one()
database = query.database
executed_sql = query.sql.strip().strip(';')