Skip to main content

通过 Celery 的异步查询

Celery

在大型分析数据库中,运行执行几分钟或几小时的查询是很常见的。为了支持超出典型 Web 请求超时(30-60秒)的长时间运行查询, 有必要为 Superset 配置一个异步后端,它包括:

  • 一个或多个 Superset worker(其实现为 Celery worker),并且可以使用 celery worker 命令启动, 运行celery worker --help以查看相关选项。
  • 一个 celery broker(消息队列),我们建议使用 Redis 或 RabbitMQ
  • 一个 results backend,定义了 worker 将持久化查询结果的位置

配置 Celery 需要在您的 superset_config.py 中定义一个 CELERY_CONFIG。 woker 和 web server 进程都应该有相同的配置。

class CeleryConfig(object):
broker_url = "redis://localhost:6379/0"
imports = (
"superset.sql_lab",
"superset.tasks.scheduler",
)
result_backend = "redis://localhost:6379/0"
worker_prefetch_multiplier = 10
task_acks_late = True
task_annotations = {
"sql_lab.get_sql_results": {
"rate_limit": "100/s",
},
}

CELERY_CONFIG = CeleryConfig

要启动一个 Celery worker 以利用配置,运行以下命令:

celery --app=superset.tasks.celery_app:app worker --pool=prefork -O fair -c 4

要启动一个调度周期性后台任务的任务(beat 调度器),运行以下命令:

celery --app=superset.tasks.celery_app:app beat

为了设置结果后端,你需要传递一个从 flask_caching.backends.base.BaseCache 派生类的 实例到你的 superset_config.py 中的 RESULTS_BACKEND 配置键。你可以 使用 Memcached、Redis、S3(https://pypi.python.org/pypi/s3werkzeugcache)、内存或文件系统 (在单服务器类型的设置或测试中),或者编写自己的缓存接口。 你的 superset_config.py 可能看起来像这样:

# On S3
from s3cache.s3cache import S3Cache
S3_CACHE_BUCKET = 'foobar-superset'
S3_CACHE_KEY_PREFIX = 'sql_lab_result'
RESULTS_BACKEND = S3Cache(S3_CACHE_BUCKET, S3_CACHE_KEY_PREFIX)

# On Redis
from flask_caching.backends.rediscache import RedisCache
RESULTS_BACKEND = RedisCache(
host='localhost', port=6379, key_prefix='superset_results')

为了提高性能,现在使用 MessagePackPyArrow 进行结果序列化。 如果出现任何问题,可以通过在你的 superset_config.py 中设置 RESULTS_BACKEND_USE_MSGPACK = False 来禁用它。 在升级现有环境时,请清除现有的结果缓存存储。

重要说明

  • 所有集群中的 Superset worker 节点和 Web server 共享一个共同的元数据库 非常重要。 这意味着 SQLite 在这种情况下无法工作,因为它对并发的支持有限,通常存在于本地文件系统上。

  • 在整个设置中 只应运行一个 celery beat 实例 。否则,后台任务可能会被多次调度,导致奇怪的行为, 如报告重复发送、高于预期的负载/流量等。

  • SQL Lab 仅在 你在数据库设置中启用异步查询执行(Sources > Databases > Edit record)的情况下异步运行你的查询。

Celery Flower

Flower 是一个基于 Web 的工具,用于监控 Celery 集群,你可以从 pip 安装:

pip install flower

你可以使用以下命令运行 flower:

celery --app=superset.tasks.celery_app:app flower