ZerethShell
File Manager
SQL Manager
/
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
rq.py
import functools import weakref import sentry_sdk from sentry_sdk.api import continue_trace from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.scope import Scope, should_send_default_pii from sentry_sdk.traces import SegmentSource from sentry_sdk.tracing import TransactionSource from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( SENSITIVE_DATA_SUBSTITUTE, capture_internal_exceptions, event_from_exception, format_timestamp, parse_version, ) try: from rq.job import JobStatus from rq.queue import Queue from rq.timeouts import JobTimeoutException from rq.version import VERSION as RQ_VERSION from rq.worker import Worker except ImportError: raise DidNotEnable("RQ not installed") try: from rq.worker import BaseWorker if not hasattr(BaseWorker, "perform_job"): BaseWorker = None except ImportError: BaseWorker = None from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any, Callable from rq.job import Job from sentry_sdk._types import Event, EventProcessor from sentry_sdk.utils import ExcInfo class RqIntegration(Integration): identifier = "rq" origin = f"auto.queue.{identifier}" @staticmethod def setup_once() -> None: version = parse_version(RQ_VERSION) _check_minimum_version(RqIntegration, version) # In rq 2.7.0+, SimpleWorker inherits from BaseWorker directly # instead of Worker, so we need to patch BaseWorker to cover both. # For older versions where BaseWorker doesn't exist or doesn't have # perform_job, we patch Worker. worker_cls = BaseWorker if BaseWorker is not None else Worker old_perform_job = worker_cls.perform_job @functools.wraps(old_perform_job) def sentry_patched_perform_job( self: "Any", job: "Job", *args: "Queue", **kwargs: "Any" ) -> bool: client = sentry_sdk.get_client() if client.get_integration(RqIntegration) is None: return old_perform_job(self, job, *args, **kwargs) with sentry_sdk.new_scope() as scope: scope.clear_breadcrumbs() scope.add_event_processor(_make_event_processor(weakref.ref(job))) if has_span_streaming_enabled(client.options): sentry_sdk.traces.continue_trace( job.meta.get("_sentry_trace_headers") or {} ) Scope.set_custom_sampling_context({"rq_job": job}) func_name = None with capture_internal_exceptions(): func_name = job.func_name with sentry_sdk.traces.start_span( name="unknown RQ task" if func_name is None else func_name, attributes={ "sentry.op": OP.QUEUE_TASK_RQ, "sentry.origin": RqIntegration.origin, "sentry.span.source": SegmentSource.TASK, SPANDATA.MESSAGING_MESSAGE_ID: job.id, }, parent_span=None, ) as span: if func_name is not None: span.set_attribute(SPANDATA.CODE_FUNCTION_NAME, func_name) rv = old_perform_job(self, job, *args, **kwargs) else: transaction = continue_trace( job.meta.get("_sentry_trace_headers") or {}, op=OP.QUEUE_TASK_RQ, name="unknown RQ task", source=TransactionSource.TASK, origin=RqIntegration.origin, ) with capture_internal_exceptions(): transaction.name = job.func_name with sentry_sdk.start_transaction( transaction, custom_sampling_context={"rq_job": job}, ): rv = old_perform_job(self, job, *args, **kwargs) if self.is_horse: # We're inside of a forked process and RQ is # about to call `os._exit`. Make sure that our # events get sent out. sentry_sdk.get_client().flush() return rv worker_cls.perform_job = sentry_patched_perform_job old_handle_exception = worker_cls.handle_exception def sentry_patched_handle_exception( self: "Worker", job: "Any", *exc_info: "Any", **kwargs: "Any" ) -> "Any": retry = ( hasattr(job, "retries_left") and job.retries_left and job.retries_left > 0 ) failed = job._status == JobStatus.FAILED or job.is_failed if failed and not retry: _capture_exception(exc_info) return old_handle_exception(self, job, *exc_info, **kwargs) worker_cls.handle_exception = sentry_patched_handle_exception old_enqueue_job = Queue.enqueue_job @functools.wraps(old_enqueue_job) def sentry_patched_enqueue_job( self: "Queue", job: "Any", **kwargs: "Any" ) -> "Any": client = sentry_sdk.get_client() if client.get_integration(RqIntegration) is None: return old_enqueue_job(self, job, **kwargs) scope = sentry_sdk.get_current_scope() span = ( scope.streamed_span if has_span_streaming_enabled(client.options) else scope.span ) if span is not None: job.meta["_sentry_trace_headers"] = dict( scope.iter_trace_propagation_headers() ) return old_enqueue_job(self, job, **kwargs) Queue.enqueue_job = sentry_patched_enqueue_job ignore_logger("rq.worker") def _make_event_processor(weak_job: "Callable[[], Job]") -> "EventProcessor": def event_processor(event: "Event", hint: "dict[str, Any]") -> "Event": job = weak_job() if job is not None: with capture_internal_exceptions(): extra = event.setdefault("extra", {}) rq_job = { "job_id": job.id, "func": job.func_name, "args": ( job.args if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE ), "kwargs": ( job.kwargs if should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE ), "description": job.description, } if job.enqueued_at: rq_job["enqueued_at"] = format_timestamp(job.enqueued_at) if job.started_at: rq_job["started_at"] = format_timestamp(job.started_at) extra["rq-job"] = rq_job if "exc_info" in hint: with capture_internal_exceptions(): if issubclass(hint["exc_info"][0], JobTimeoutException): event["fingerprint"] = ["rq", "JobTimeoutException", job.func_name] return event return event_processor def _capture_exception(exc_info: "ExcInfo", **kwargs: "Any") -> None: client = sentry_sdk.get_client() event, hint = event_from_exception( exc_info, client_options=client.options, mechanism={"type": "rq", "handled": False}, ) sentry_sdk.capture_event(event, hint=hint)
Kaydet
Vazgeç