console_subscriber/
lib.rs

1#![doc = include_str!("../README.md")]
2use console_api as proto;
3use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
4use serde::Serialize;
5#[cfg(feature = "vsock")]
6use tokio_vsock::VsockListener;
7
8use std::{
9    cell::RefCell,
10    fmt,
11    net::{IpAddr, Ipv4Addr},
12    sync::{
13        atomic::{AtomicUsize, Ordering},
14        Arc,
15    },
16    time::{Duration, Instant},
17};
18use thread_local::ThreadLocal;
19#[cfg(unix)]
20use tokio::net::UnixListener;
21use tokio::sync::{mpsc, oneshot};
22#[cfg(unix)]
23use tokio_stream::wrappers::UnixListenerStream;
24use tracing_core::{
25    span::{self, Id},
26    subscriber::{self, Subscriber},
27    Metadata,
28};
29use tracing_subscriber::{
30    layer::Context,
31    registry::{Extensions, LookupSpan},
32    Layer,
33};
34
35mod aggregator;
36mod attribute;
37mod builder;
38mod callsites;
39mod record;
40mod stack;
41mod stats;
42pub(crate) mod sync;
43mod visitors;
44
45pub use aggregator::Aggregator;
46pub use builder::{Builder, ServerAddr};
47use callsites::Callsites;
48use record::Recorder;
49use stack::SpanStack;
50use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor};
51
52pub use builder::{init, spawn};
53
54use crate::visitors::{PollOpVisitor, StateUpdateVisitor};
55
56/// A [`ConsoleLayer`] is a [`tracing_subscriber::Layer`] that records [`tracing`]
57/// spans and events emitted by the async runtime.
58///
59/// Runtimes emit [`tracing`] spans and events that represent specific operations
60/// that occur in asynchronous Rust programs, such as spawning tasks and waker
61/// operations. The `ConsoleLayer` collects and aggregates these events, and the
62/// resulting diagnostic data is exported to clients by the corresponding gRPC
63/// [`Server`] instance.
64///
65/// [`tracing`]: https://docs.rs/tracing
66pub struct ConsoleLayer {
67    current_spans: ThreadLocal<RefCell<SpanStack>>,
68    tx: mpsc::Sender<Event>,
69    shared: Arc<Shared>,
70    /// When the channel capacity goes under this number, a flush in the aggregator
71    /// will be triggered.
72    flush_under_capacity: usize,
73
74    /// Set of callsites for spans representing spawned tasks.
75    ///
76    /// For task spans, each runtime these will have like, 1-5 callsites in it, max, so
77    /// 8 should be plenty. If several runtimes are in use, we may have to spill
78    /// over into the backup hashmap, but it's unlikely.
79    spawn_callsites: Callsites<8>,
80
81    /// Set of callsites for events representing waker operations.
82    ///
83    /// 16 is probably a reasonable number of waker ops; it's a bit generous if
84    /// there's only one async runtime library in use, but if there are multiple,
85    /// they might all have their own sets of waker ops.
86    waker_callsites: Callsites<16>,
87
88    /// Set of callsites for spans representing resources
89    ///
90    /// TODO: Take some time to determine more reasonable numbers
91    resource_callsites: Callsites<32>,
92
93    /// Set of callsites for spans representing async operations on resources
94    ///
95    /// TODO: Take some time to determine more reasonable numbers
96    async_op_callsites: Callsites<32>,
97
98    /// Set of callsites for spans representing async op poll operations
99    ///
100    /// TODO: Take some time to determine more reasonable numbers
101    async_op_poll_callsites: Callsites<32>,
102
103    /// Set of callsites for events representing poll operation invocations on resources
104    ///
105    /// TODO: Take some time to determine more reasonable numbers
106    poll_op_callsites: Callsites<32>,
107
108    /// Set of callsites for events representing state attribute state updates on resources
109    ///
110    /// TODO: Take some time to determine more reasonable numbers
111    resource_state_update_callsites: Callsites<32>,
112
113    /// Set of callsites for events representing state attribute state updates on async resource ops
114    ///
115    /// TODO: Take some time to determine more reasonable numbers
116    async_op_state_update_callsites: Callsites<32>,
117
118    /// A sink to record all events to a file.
119    recorder: Option<Recorder>,
120
121    /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
122    /// timestamp that can be sent over the wire or recorded to JSON.
123    base_time: stats::TimeAnchor,
124
125    /// Maximum value for the poll time histogram.
126    ///
127    /// By default, this is one second.
128    max_poll_duration_nanos: u64,
129
130    /// Maximum value for the scheduled time histogram.
131    ///
132    /// By default, this is one second.
133    max_scheduled_duration_nanos: u64,
134}
135
136/// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire].
137///
138/// Client applications, such as the [`tokio-console` CLI][cli] connect to the gRPC
139/// server, and stream data about the runtime's history (such as a list of the
140/// currently active tasks, or statistics summarizing polling times). A [`Server`] also
141/// interprets commands from a client application, such a request to focus in on
142/// a specific task, and translates that into a stream of details specific to
143/// that task.
144///
145/// [wire]: https://docs.rs/console-api
146/// [cli]: https://crates.io/crates/tokio-console
147pub struct Server {
148    subscribe: mpsc::Sender<Command>,
149    addr: ServerAddr,
150    aggregator: Option<Aggregator>,
151    client_buffer: usize,
152}
153
154pub(crate) trait ToProto {
155    type Output;
156    fn to_proto(&self, base_time: &stats::TimeAnchor) -> Self::Output;
157}
158
159/// State shared between the `ConsoleLayer` and the `Aggregator` task.
160#[derive(Debug, Default)]
161struct Shared {
162    /// Used to notify the aggregator task when the event buffer should be
163    /// flushed.
164    flush: aggregator::Flush,
165
166    /// A counter of how many task events were dropped because the event buffer
167    /// was at capacity.
168    dropped_tasks: AtomicUsize,
169
170    /// A counter of how many async op events were dropped because the event buffer
171    /// was at capacity.
172    dropped_async_ops: AtomicUsize,
173
174    /// A counter of how many resource events were dropped because the event buffer
175    /// was at capacity.
176    dropped_resources: AtomicUsize,
177}
178
179struct Watch<T>(mpsc::Sender<Result<T, tonic::Status>>);
180
181enum Command {
182    Instrument(Watch<proto::instrument::Update>),
183    WatchTaskDetail(WatchRequest<proto::tasks::TaskDetails>),
184    WatchState(Watch<proto::instrument::State>),
185    Pause,
186    Resume,
187}
188
189struct WatchRequest<T> {
190    id: Id,
191    stream_sender: oneshot::Sender<mpsc::Receiver<Result<T, tonic::Status>>>,
192    buffer: usize,
193}
194
195#[derive(Debug)]
196enum Event {
197    Metadata(&'static Metadata<'static>),
198    Spawn {
199        id: span::Id,
200        metadata: &'static Metadata<'static>,
201        stats: Arc<stats::TaskStats>,
202        fields: Vec<proto::Field>,
203        location: Option<proto::Location>,
204    },
205    Resource {
206        id: span::Id,
207        parent_id: Option<span::Id>,
208        metadata: &'static Metadata<'static>,
209        concrete_type: String,
210        kind: resource::Kind,
211        location: Option<proto::Location>,
212        is_internal: bool,
213        stats: Arc<stats::ResourceStats>,
214    },
215    PollOp {
216        metadata: &'static Metadata<'static>,
217        resource_id: span::Id,
218        op_name: String,
219        async_op_id: span::Id,
220        task_id: span::Id,
221        is_ready: bool,
222    },
223    AsyncResourceOp {
224        id: span::Id,
225        parent_id: Option<span::Id>,
226        resource_id: span::Id,
227        metadata: &'static Metadata<'static>,
228        source: String,
229
230        stats: Arc<stats::AsyncOpStats>,
231    },
232}
233
234#[derive(Clone, Debug, Copy, Serialize)]
235enum WakeOp {
236    Wake { self_wake: bool },
237    WakeByRef { self_wake: bool },
238    Clone,
239    Drop,
240}
241
242impl ConsoleLayer {
243    /// Returns a `ConsoleLayer` built with the default settings.
244    ///
245    /// Note: these defaults do *not* include values provided via the
246    /// environment variables specified in [`Builder::with_default_env`].
247    ///
248    /// See also [`Builder::build`].
249    pub fn new() -> (Self, Server) {
250        Self::builder().build()
251    }
252
253    /// Returns a [`Builder`] for configuring a `ConsoleLayer`.
254    ///
255    /// Note that the returned builder does *not* include values provided via
256    /// the environment variables specified in [`Builder::with_default_env`].
257    /// To extract those, you can call that method on the returned builder.
258    pub fn builder() -> Builder {
259        Builder::default()
260    }
261
262    fn build(config: Builder) -> (Self, Server) {
263        // The `cfg` value *appears* to be a constant to clippy, but it changes
264        // depending on the build-time configuration...
265        #![allow(clippy::assertions_on_constants)]
266        assert!(
267            cfg!(any(tokio_unstable, console_without_tokio_unstable)),
268            "task tracing requires Tokio to be built with RUSTFLAGS=\"--cfg tokio_unstable\"!"
269        );
270
271        let base_time = stats::TimeAnchor::new();
272        tracing::debug!(
273            config.event_buffer_capacity,
274            config.client_buffer_capacity,
275            ?config.publish_interval,
276            ?config.retention,
277            ?config.server_addr,
278            ?config.recording_path,
279            ?config.filter_env_var,
280            ?config.poll_duration_max,
281            ?config.scheduled_duration_max,
282            ?base_time,
283            "configured console subscriber"
284        );
285
286        let (tx, events) = mpsc::channel(config.event_buffer_capacity);
287        let (subscribe, rpcs) = mpsc::channel(256);
288        let shared = Arc::new(Shared::default());
289        let aggregator = Aggregator::new(events, rpcs, &config, shared.clone(), base_time.clone());
290        // Conservatively, start to trigger a flush when half the channel is full.
291        // This tries to reduce the chance of losing events to a full channel.
292        let flush_under_capacity = config.event_buffer_capacity / 2;
293        let recorder = config
294            .recording_path
295            .as_ref()
296            .map(|path| Recorder::new(path).expect("creating recorder"));
297        let server = Server {
298            aggregator: Some(aggregator),
299            addr: config.server_addr,
300            subscribe,
301            client_buffer: config.client_buffer_capacity,
302        };
303        let layer = Self {
304            current_spans: ThreadLocal::new(),
305            tx,
306            shared,
307            flush_under_capacity,
308            spawn_callsites: Callsites::default(),
309            waker_callsites: Callsites::default(),
310            resource_callsites: Callsites::default(),
311            async_op_callsites: Callsites::default(),
312            async_op_poll_callsites: Callsites::default(),
313            poll_op_callsites: Callsites::default(),
314            resource_state_update_callsites: Callsites::default(),
315            async_op_state_update_callsites: Callsites::default(),
316            recorder,
317            base_time,
318            max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64,
319            max_scheduled_duration_nanos: config.scheduled_duration_max.as_nanos() as u64,
320        };
321        (layer, server)
322    }
323}
324
325impl ConsoleLayer {
326    /// Default maximum capacity for the channel of events sent from a
327    /// [`ConsoleLayer`] to a [`Server`].
328    ///
329    /// When this capacity is exhausted, additional events will be dropped.
330    /// Decreasing this value will reduce memory usage, but may result in
331    /// events being dropped more frequently.
332    ///
333    /// See also [`Builder::event_buffer_capacity`].
334    pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 100;
335    /// Default maximum capacity for th echannel of events sent from a
336    /// [`Server`] to each subscribed client.
337    ///
338    /// When this capacity is exhausted, the client is assumed to be inactive,
339    /// and may be disconnected.
340    ///
341    /// See also [`Builder::client_buffer_capacity`].
342    pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4;
343
344    /// Default frequency for publishing events to clients.
345    ///
346    /// Note that methods like [`init`][`crate::init`] and [`spawn`][`crate::spawn`] will take the value
347    /// from the `TOKIO_CONSOLE_PUBLISH_INTERVAL` [environment variable] before falling
348    /// back on this default.
349    ///
350    /// See also [`Builder::publish_interval`].
351    ///
352    /// [environment variable]: `Builder::with_default_env`
353    pub const DEFAULT_PUBLISH_INTERVAL: Duration = Duration::from_secs(1);
354
355    /// By default, completed spans are retained for one hour.
356    ///
357    /// Note that methods like [`init`][`crate::init`] and
358    /// [`spawn`][`crate::spawn`] will take the value from the
359    /// `TOKIO_CONSOLE_RETENTION` [environment variable] before falling back on
360    /// this default.
361    ///
362    /// See also [`Builder::retention`].
363    ///
364    /// [environment variable]: `Builder::with_default_env`
365    pub const DEFAULT_RETENTION: Duration = Duration::from_secs(60 * 60);
366
367    /// The default maximum value for task poll duration histograms.
368    ///
369    /// Any poll duration exceeding this will be clamped to this value. By
370    /// default, the maximum poll duration is one second.
371    ///
372    /// See also [`Builder::poll_duration_histogram_max`].
373    pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1);
374
375    /// The default maximum value for the task scheduled duration histogram.
376    ///
377    /// Any scheduled duration (the time from a task being woken until it is next
378    /// polled) exceeding this will be clamped to this value. By default, the
379    /// maximum scheduled duration is one second.
380    ///
381    /// See also [`Builder::scheduled_duration_histogram_max`].
382    pub const DEFAULT_SCHEDULED_DURATION_MAX: Duration = Duration::from_secs(1);
383
384    fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool {
385        self.spawn_callsites.contains(meta)
386    }
387
388    fn is_resource(&self, meta: &'static Metadata<'static>) -> bool {
389        self.resource_callsites.contains(meta)
390    }
391
392    fn is_async_op(&self, meta: &'static Metadata<'static>) -> bool {
393        self.async_op_callsites.contains(meta)
394    }
395
396    fn is_id_spawned<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
397    where
398        S: Subscriber + for<'a> LookupSpan<'a>,
399    {
400        cx.span(id)
401            .map(|span| self.is_spawn(span.metadata()))
402            .unwrap_or(false)
403    }
404
405    fn is_id_resource<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
406    where
407        S: Subscriber + for<'a> LookupSpan<'a>,
408    {
409        cx.span(id)
410            .map(|span| self.is_resource(span.metadata()))
411            .unwrap_or(false)
412    }
413
414    fn is_id_async_op<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
415    where
416        S: Subscriber + for<'a> LookupSpan<'a>,
417    {
418        cx.span(id)
419            .map(|span| self.is_async_op(span.metadata()))
420            .unwrap_or(false)
421    }
422
423    fn first_entered<P>(&self, stack: &SpanStack, p: P) -> Option<span::Id>
424    where
425        P: Fn(&span::Id) -> bool,
426    {
427        stack
428            .stack()
429            .iter()
430            .rev()
431            .find(|id| p(id.id()))
432            .map(|id| id.id())
433            .cloned()
434    }
435
436    fn send_metadata(&self, dropped: &AtomicUsize, event: Event) -> bool {
437        self.send_stats(dropped, move || (event, ())).is_some()
438    }
439
440    fn send_stats<S>(
441        &self,
442        dropped: &AtomicUsize,
443        mk_event: impl FnOnce() -> (Event, S),
444    ) -> Option<S> {
445        use mpsc::error::TrySendError;
446
447        // Return whether or not we actually sent the event.
448        let sent = match self.tx.try_reserve() {
449            Ok(permit) => {
450                let (event, stats) = mk_event();
451                permit.send(event);
452                Some(stats)
453            }
454            Err(TrySendError::Closed(_)) => {
455                // we should warn here eventually, but nop for now because we
456                // can't trigger tracing events...
457                None
458            }
459            Err(TrySendError::Full(_)) => {
460                // this shouldn't happen, since we trigger a flush when
461                // approaching the high water line...but if the executor wait
462                // time is very high, maybe the aggregator task hasn't been
463                // polled yet. so... eek?!
464                dropped.fetch_add(1, Ordering::Release);
465                None
466            }
467        };
468
469        let capacity = self.tx.capacity();
470        if capacity <= self.flush_under_capacity {
471            self.shared.flush.trigger();
472        }
473
474        sent
475    }
476
477    fn record(&self, event: impl FnOnce() -> record::Event) {
478        if let Some(ref recorder) = self.recorder {
479            recorder.record(event());
480        }
481    }
482
483    fn state_update<S>(
484        &self,
485        id: &Id,
486        event: &tracing::Event<'_>,
487        ctx: &Context<'_, S>,
488        get_stats: impl for<'a> Fn(&'a Extensions) -> Option<&'a stats::ResourceStats>,
489    ) where
490        S: Subscriber + for<'a> LookupSpan<'a>,
491    {
492        let meta_id = event.metadata().into();
493        let mut state_update_visitor = StateUpdateVisitor::new(meta_id);
494        event.record(&mut state_update_visitor);
495
496        let update = match state_update_visitor.result() {
497            Some(update) => update,
498            None => return,
499        };
500
501        let span = match ctx.span(id) {
502            Some(span) => span,
503            // XXX(eliza): no span exists for a resource ID, we should maybe
504            // record an error here...
505            None => return,
506        };
507
508        let exts = span.extensions();
509        let stats = match get_stats(&exts) {
510            Some(stats) => stats,
511            // XXX(eliza): a resource span was not a resource??? this is a bug
512            None => return,
513        };
514
515        stats.update_attribute(id, &update);
516
517        if let Some(parent) = stats.parent_id.as_ref().and_then(|parent| ctx.span(parent)) {
518            let exts = parent.extensions();
519            if let Some(stats) = get_stats(&exts) {
520                if stats.inherit_child_attributes {
521                    stats.update_attribute(id, &update);
522                }
523            }
524        }
525    }
526}
527
528impl<S> Layer<S> for ConsoleLayer
529where
530    S: Subscriber + for<'a> LookupSpan<'a>,
531{
532    fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest {
533        if !meta.is_span() && !meta.is_event() {
534            return subscriber::Interest::never();
535        }
536
537        let dropped = match (meta.name(), meta.target()) {
538            ("runtime.spawn", _) | ("task", "tokio::task") => {
539                self.spawn_callsites.insert(meta);
540                &self.shared.dropped_tasks
541            }
542            (_, "runtime::waker") | (_, "tokio::task::waker") => {
543                self.waker_callsites.insert(meta);
544                &self.shared.dropped_tasks
545            }
546            (ResourceVisitor::RES_SPAN_NAME, _) => {
547                self.resource_callsites.insert(meta);
548                &self.shared.dropped_resources
549            }
550            (AsyncOpVisitor::ASYNC_OP_SPAN_NAME, _) => {
551                self.async_op_callsites.insert(meta);
552                &self.shared.dropped_async_ops
553            }
554            ("runtime.resource.async_op.poll", _) => {
555                self.async_op_poll_callsites.insert(meta);
556                &self.shared.dropped_async_ops
557            }
558            (_, PollOpVisitor::POLL_OP_EVENT_TARGET) => {
559                self.poll_op_callsites.insert(meta);
560                &self.shared.dropped_async_ops
561            }
562            (_, StateUpdateVisitor::RE_STATE_UPDATE_EVENT_TARGET) => {
563                self.resource_state_update_callsites.insert(meta);
564                &self.shared.dropped_resources
565            }
566            (_, StateUpdateVisitor::AO_STATE_UPDATE_EVENT_TARGET) => {
567                self.async_op_state_update_callsites.insert(meta);
568                &self.shared.dropped_async_ops
569            }
570            (_, _) => &self.shared.dropped_tasks,
571        };
572
573        self.send_metadata(dropped, Event::Metadata(meta));
574        subscriber::Interest::always()
575    }
576
577    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
578        let metadata = attrs.metadata();
579        if self.is_spawn(metadata) {
580            let at = Instant::now();
581            let mut task_visitor = TaskVisitor::new(metadata.into());
582            attrs.record(&mut task_visitor);
583            let (fields, location) = task_visitor.result();
584            self.record(|| record::Event::Spawn {
585                id: id.into_u64(),
586                at: self.base_time.to_system_time(at),
587                fields: record::SerializeFields(fields.clone()),
588            });
589            if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
590                let stats = Arc::new(stats::TaskStats::new(
591                    self.max_poll_duration_nanos,
592                    self.max_scheduled_duration_nanos,
593                    at,
594                ));
595                let event = Event::Spawn {
596                    id: id.clone(),
597                    stats: stats.clone(),
598                    metadata,
599                    fields,
600                    location,
601                };
602                (event, stats)
603            }) {
604                ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
605            }
606            return;
607        }
608
609        if self.is_resource(metadata) {
610            let at = Instant::now();
611            let mut resource_visitor = ResourceVisitor::default();
612            attrs.record(&mut resource_visitor);
613            if let Some(result) = resource_visitor.result() {
614                let ResourceVisitorResult {
615                    concrete_type,
616                    kind,
617                    location,
618                    is_internal,
619                    inherit_child_attrs,
620                } = result;
621                let parent_id = self.current_spans.get().and_then(|stack| {
622                    self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
623                });
624                if let Some(stats) = self.send_stats(&self.shared.dropped_resources, move || {
625                    let stats = Arc::new(stats::ResourceStats::new(
626                        at,
627                        inherit_child_attrs,
628                        parent_id.clone(),
629                    ));
630                    let event = Event::Resource {
631                        id: id.clone(),
632                        parent_id,
633                        metadata,
634                        concrete_type,
635                        kind,
636                        location,
637                        is_internal,
638                        stats: stats.clone(),
639                    };
640                    (event, stats)
641                }) {
642                    ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
643                }
644            }
645            return;
646        }
647
648        if self.is_async_op(metadata) {
649            let at = Instant::now();
650            let mut async_op_visitor = AsyncOpVisitor::default();
651            attrs.record(&mut async_op_visitor);
652            if let Some((source, inherit_child_attrs)) = async_op_visitor.result() {
653                let resource_id = self.current_spans.get().and_then(|stack| {
654                    self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
655                });
656
657                let parent_id = self.current_spans.get().and_then(|stack| {
658                    self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
659                });
660
661                if let Some(resource_id) = resource_id {
662                    if let Some(stats) =
663                        self.send_stats(&self.shared.dropped_async_ops, move || {
664                            let stats = Arc::new(stats::AsyncOpStats::new(
665                                at,
666                                inherit_child_attrs,
667                                parent_id.clone(),
668                            ));
669                            let event = Event::AsyncResourceOp {
670                                id: id.clone(),
671                                parent_id,
672                                resource_id,
673                                metadata,
674                                source,
675                                stats: stats.clone(),
676                            };
677                            (event, stats)
678                        })
679                    {
680                        ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
681                    }
682                }
683            }
684        }
685    }
686
687    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
688        let metadata = event.metadata();
689        if self.waker_callsites.contains(metadata) {
690            let at = Instant::now();
691            let mut visitor = WakerVisitor::default();
692            event.record(&mut visitor);
693            // XXX (eliza): ew...
694            if let Some((id, mut op)) = visitor.result() {
695                if let Some(span) = ctx.span(&id) {
696                    let exts = span.extensions();
697                    if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
698                        if op.is_wake() {
699                            // Are we currently inside the task's span? If so, the task
700                            // has woken itself.
701
702                            let self_wake = self
703                                .current_spans
704                                .get()
705                                .map(|spans| spans.borrow().iter().any(|span| span == &id))
706                                .unwrap_or(false);
707                            op = op.self_wake(self_wake);
708                        }
709
710                        stats.record_wake_op(op, at);
711                        self.record(|| record::Event::Waker {
712                            id: id.into_u64(),
713                            at: self.base_time.to_system_time(at),
714                            op,
715                        });
716                    }
717                }
718            }
719            return;
720        }
721
722        if self.poll_op_callsites.contains(metadata) {
723            let resource_id = self.current_spans.get().and_then(|stack| {
724                self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
725            });
726            // poll op event should have a resource span parent
727            if let Some(resource_id) = resource_id {
728                let mut poll_op_visitor = PollOpVisitor::default();
729                event.record(&mut poll_op_visitor);
730                if let Some((op_name, is_ready)) = poll_op_visitor.result() {
731                    let task_and_async_op_ids = self.current_spans.get().and_then(|stack| {
732                        let stack = stack.borrow();
733                        let task_id =
734                            self.first_entered(&stack, |id| self.is_id_spawned(id, &ctx))?;
735                        let async_op_id =
736                            self.first_entered(&stack, |id| self.is_id_async_op(id, &ctx))?;
737                        Some((task_id, async_op_id))
738                    });
739                    // poll op event should be emitted in the context of an async op and task spans
740                    if let Some((task_id, async_op_id)) = task_and_async_op_ids {
741                        if let Some(span) = ctx.span(&async_op_id) {
742                            let exts = span.extensions();
743                            if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
744                                stats.set_task_id(&task_id);
745                            }
746                        }
747
748                        self.send_stats(&self.shared.dropped_async_ops, || {
749                            let event = Event::PollOp {
750                                metadata,
751                                op_name,
752                                resource_id,
753                                async_op_id,
754                                task_id,
755                                is_ready,
756                            };
757                            (event, ())
758                        });
759
760                        // TODO: JSON recorder doesn't care about poll ops.
761                    }
762                }
763            }
764            return;
765        }
766
767        if self.resource_state_update_callsites.contains(metadata) {
768            // state update event should have a resource span parent
769            let resource_id = self.current_spans.get().and_then(|stack| {
770                self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
771            });
772            if let Some(id) = resource_id {
773                self.state_update(&id, event, &ctx, |exts| {
774                    exts.get::<Arc<stats::ResourceStats>>()
775                        .map(<Arc<stats::ResourceStats> as std::ops::Deref>::deref)
776                });
777            }
778
779            return;
780        }
781
782        if self.async_op_state_update_callsites.contains(metadata) {
783            let async_op_id = self.current_spans.get().and_then(|stack| {
784                self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
785            });
786            if let Some(id) = async_op_id {
787                self.state_update(&id, event, &ctx, |exts| {
788                    let async_op = exts.get::<Arc<stats::AsyncOpStats>>()?;
789                    Some(&async_op.stats)
790                });
791            }
792        }
793    }
794
795    fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) {
796        if let Some(span) = cx.span(id) {
797            let now = Instant::now();
798            let exts = span.extensions();
799            // if the span we are entering is a task or async op, record the
800            // poll stats.
801            if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
802                stats.start_poll(now);
803            } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
804                stats.start_poll(now);
805            } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
806                // otherwise, is the span a resource? in that case, we also want
807                // to enter it, although we don't care about recording poll
808                // stats.
809            } else {
810                return;
811            };
812
813            self.current_spans
814                .get_or_default()
815                .borrow_mut()
816                .push(id.clone());
817
818            self.record(|| record::Event::Enter {
819                id: id.into_u64(),
820                at: self.base_time.to_system_time(now),
821            });
822        }
823    }
824
825    fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) {
826        if let Some(span) = cx.span(id) {
827            let exts = span.extensions();
828            let now = Instant::now();
829            // if the span we are entering is a task or async op, record the
830            // poll stats.
831            if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
832                stats.end_poll(now);
833            } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
834                stats.end_poll(now);
835            } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
836                // otherwise, is the span a resource? in that case, we also want
837                // to enter it, although we don't care about recording poll
838                // stats.
839            } else {
840                return;
841            };
842
843            self.current_spans.get_or_default().borrow_mut().pop(id);
844
845            self.record(|| record::Event::Exit {
846                id: id.into_u64(),
847                at: self.base_time.to_system_time(now),
848            });
849        }
850    }
851
852    fn on_close(&self, id: span::Id, cx: Context<'_, S>) {
853        if let Some(span) = cx.span(&id) {
854            let now = Instant::now();
855            let exts = span.extensions();
856            if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
857                stats.drop_task(now);
858            } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
859                stats.drop_async_op(now);
860            } else if let Some(stats) = exts.get::<Arc<stats::ResourceStats>>() {
861                stats.drop_resource(now);
862            }
863            self.record(|| record::Event::Close {
864                id: id.into_u64(),
865                at: self.base_time.to_system_time(now),
866            });
867        }
868    }
869}
870
871impl fmt::Debug for ConsoleLayer {
872    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873        f.debug_struct("ConsoleLayer")
874            // mpsc::Sender debug impl is not very useful
875            .field("tx", &format_args!("<...>"))
876            .field("tx.capacity", &self.tx.capacity())
877            .field("shared", &self.shared)
878            .field("spawn_callsites", &self.spawn_callsites)
879            .field("waker_callsites", &self.waker_callsites)
880            .finish()
881    }
882}
883
884impl Server {
885    // XXX(eliza): why is `SocketAddr::new` not `const`???
886    /// A [`Server`] by default binds socket address 127.0.0.1 to service remote
887    /// procedure calls.
888    ///
889    /// Note that methods like [`init`][`crate::init`] and
890    /// [`spawn`][`crate::spawn`] will parse the socket address from the
891    /// `TOKIO_CONSOLE_BIND` [environment variable] before falling back on
892    /// constructing a socket address from this default.
893    ///
894    /// See also [`Builder::server_addr`].
895    ///
896    /// [environment variable]: `Builder::with_default_env`
897    pub const DEFAULT_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
898
899    /// A [`Server`] by default binds port 6669 to service remote procedure
900    /// calls.
901    ///
902    /// Note that methods like [`init`][`crate::init`] and
903    /// [`spawn`][`crate::spawn`] will parse the socket address from the
904    /// `TOKIO_CONSOLE_BIND` [environment variable] before falling back on
905    /// constructing a socket address from this default.
906    ///
907    /// See also [`Builder::server_addr`].
908    ///
909    /// [environment variable]: `Builder::with_default_env`
910    pub const DEFAULT_PORT: u16 = 6669;
911
912    /// Starts the gRPC service with the default gRPC settings.
913    ///
914    /// To configure gRPC server settings before starting the server, use
915    /// [`serve_with`] instead. This method is equivalent to calling [`serve_with`]
916    /// and providing the default gRPC server settings:
917    ///
918    /// ```rust
919    /// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
920    /// # let (_, server) = console_subscriber::ConsoleLayer::new();
921    /// server.serve_with(tonic::transport::Server::default()).await
922    /// # }
923    /// ```
924    /// [`serve_with`]: Server::serve_with
925    pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
926        self.serve_with(tonic::transport::Server::default()).await
927    }
928
929    /// Starts the gRPC service with the given [`tonic`] gRPC transport server
930    /// `builder`.
931    ///
932    /// The `builder` parameter may be used to configure gRPC-specific settings
933    /// prior to starting the server.
934    ///
935    /// This spawns both the server task and the event aggregation worker
936    /// task on the current async runtime.
937    ///
938    /// [`tonic`]: https://docs.rs/tonic/
939    pub async fn serve_with(
940        self,
941        mut builder: tonic::transport::Server,
942    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
943        let addr = self.addr.clone();
944        let ServerParts {
945            instrument_server,
946            aggregator,
947        } = self.into_parts();
948        let aggregate = spawn_named(aggregator.run(), "console::aggregate");
949        let router = builder.add_service(instrument_server);
950        let res = match addr {
951            ServerAddr::Tcp(addr) => {
952                let serve = router.serve(addr);
953                spawn_named(serve, "console::serve").await
954            }
955            #[cfg(unix)]
956            ServerAddr::Unix(path) => {
957                let incoming = UnixListener::bind(path)?;
958                let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
959                spawn_named(serve, "console::serve").await
960            }
961            #[cfg(feature = "vsock")]
962            ServerAddr::Vsock(addr) => {
963                let incoming = VsockListener::bind(addr)?.incoming();
964                let serve = router.serve_with_incoming(incoming);
965                spawn_named(serve, "console::serve").await
966            }
967        };
968        aggregate.abort();
969        res?.map_err(Into::into)
970    }
971
972    /// Starts the gRPC service with the default gRPC settings and gRPC-Web
973    /// support.
974    ///
975    /// # Examples
976    ///
977    /// To serve the instrument server with gRPC-Web support with the default
978    /// settings:
979    ///
980    /// ```rust
981    /// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
982    /// # let (_, server) = console_subscriber::ConsoleLayer::new();
983    /// server.serve_with_grpc_web(tonic::transport::Server::default()).await
984    /// # }
985    /// ```
986    ///
987    /// To serve the instrument server with gRPC-Web support and a custom CORS configuration, use the
988    /// following code:
989    ///
990    /// ```rust
991    /// # use std::{thread, time::Duration};
992    /// #
993    /// use console_subscriber::{ConsoleLayer, ServerParts};
994    /// use tonic_web::GrpcWebLayer;
995    /// use tower_http::cors::{CorsLayer, AllowOrigin};
996    /// use http::header::HeaderName;
997    /// # use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
998    /// # const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
999    /// # const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
1000    /// #    ["grpc-status", "grpc-message", "grpc-status-details-bin"];
1001    /// # const DEFAULT_ALLOW_HEADERS: [&str; 5] = [
1002    /// #    "x-grpc-web",
1003    /// #    "content-type",
1004    /// #    "x-user-agent",
1005    /// #    "grpc-timeout",
1006    /// #    "user-agent",
1007    /// # ];
1008    ///
1009    /// let (console_layer, server) = ConsoleLayer::builder().with_default_env().build();
1010    /// # thread::Builder::new()
1011    /// #    .name("subscriber".into())
1012    /// #    .spawn(move || {
1013    /// // Customize the CORS configuration.
1014    /// let cors = CorsLayer::new()
1015    ///     .allow_origin(AllowOrigin::mirror_request())
1016    ///     .allow_credentials(true)
1017    ///     .max_age(DEFAULT_MAX_AGE)
1018    ///     .expose_headers(
1019    ///         DEFAULT_EXPOSED_HEADERS
1020    ///             .iter()
1021    ///             .cloned()
1022    ///             .map(HeaderName::from_static)
1023    ///             .collect::<Vec<HeaderName>>(),
1024    ///     )
1025    ///     .allow_headers(
1026    ///         DEFAULT_ALLOW_HEADERS
1027    ///             .iter()
1028    ///             .cloned()
1029    ///             .map(HeaderName::from_static)
1030    ///             .collect::<Vec<HeaderName>>(),
1031    ///     );
1032    /// #       let runtime = tokio::runtime::Builder::new_current_thread()
1033    /// #           .enable_all()
1034    /// #           .build()
1035    /// #           .expect("console subscriber runtime initialization failed");
1036    /// #       runtime.block_on(async move {
1037    ///
1038    /// let ServerParts {
1039    ///     instrument_server,
1040    ///     aggregator,
1041    ///     ..
1042    /// } = server.into_parts();
1043    /// tokio::spawn(aggregator.run());
1044    ///
1045    /// // Serve the instrument server with gRPC-Web support and the CORS configuration.
1046    /// let router = tonic::transport::Server::builder()
1047    ///     .accept_http1(true)
1048    ///     .layer(cors)
1049    ///     .layer(GrpcWebLayer::new())
1050    ///     .add_service(instrument_server);
1051    /// let serve = router.serve(std::net::SocketAddr::new(
1052    ///     std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1053    ///     // 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead.
1054    ///     9999,
1055    /// ));
1056    ///
1057    /// // Finally, spawn the server.
1058    /// serve.await.expect("console subscriber server failed");
1059    /// #       });
1060    /// #   })
1061    /// #   .expect("console subscriber could not spawn thread");
1062    /// # tracing_subscriber::registry().with(console_layer).init();
1063    /// ```
1064    ///
1065    /// For a comprehensive understanding and complete code example,
1066    /// please refer to the `grpc-web` example in the examples directory.
1067    ///
1068    /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1069    #[cfg(feature = "grpc-web")]
1070    pub async fn serve_with_grpc_web(
1071        self,
1072        builder: tonic::transport::Server,
1073    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
1074        let addr = self.addr.clone();
1075        let ServerParts {
1076            instrument_server,
1077            aggregator,
1078        } = self.into_parts();
1079        let router = builder
1080            .accept_http1(true)
1081            .add_service(tonic_web::enable(instrument_server));
1082        let aggregate = spawn_named(aggregator.run(), "console::aggregate");
1083        let res = match addr {
1084            ServerAddr::Tcp(addr) => {
1085                let serve = router.serve(addr);
1086                spawn_named(serve, "console::serve").await
1087            }
1088            #[cfg(unix)]
1089            ServerAddr::Unix(path) => {
1090                let incoming = UnixListener::bind(path)?;
1091                let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
1092                spawn_named(serve, "console::serve").await
1093            }
1094            #[cfg(feature = "vsock")]
1095            ServerAddr::Vsock(addr) => {
1096                let incoming = VsockListener::bind(addr)?.incoming();
1097                let serve = router.serve_with_incoming(incoming);
1098                spawn_named(serve, "console::serve").await
1099            }
1100        };
1101        aggregate.abort();
1102        res?.map_err(Into::into)
1103    }
1104
1105    /// Returns the parts needed to spawn a gRPC server and the aggregator that
1106    /// supplies it.
1107    ///
1108    /// Note that a server spawned in this way will disregard any value set by
1109    /// [`Builder::server_addr`], as the user becomes responsible for defining
1110    /// the address when calling [`Router::serve`].
1111    ///
1112    /// Additionally, the user of this API must ensure that the [`Aggregator`]
1113    /// is running for as long as the gRPC server is. If the server stops
1114    /// running, the aggregator task can be aborted.
1115    ///
1116    /// # Examples
1117    ///
1118    /// The parts can be used to serve the instrument server together with
1119    /// other endpoints from the same gRPC server.
1120    ///
1121    /// ```
1122    /// use console_subscriber::{ConsoleLayer, ServerParts};
1123    ///
1124    /// # let runtime = tokio::runtime::Builder::new_current_thread()
1125    /// #     .enable_all()
1126    /// #     .build()
1127    /// #     .unwrap();
1128    /// # runtime.block_on(async {
1129    /// let (console_layer, server) = ConsoleLayer::builder().build();
1130    /// let ServerParts {
1131    ///     instrument_server,
1132    ///     aggregator,
1133    ///     ..
1134    /// } = server.into_parts();
1135    ///
1136    /// let aggregator_handle = tokio::spawn(aggregator.run());
1137    /// let router = tonic::transport::Server::builder()
1138    ///     //.add_service(some_other_service)
1139    ///     .add_service(instrument_server);
1140    /// let serve = router.serve(std::net::SocketAddr::new(
1141    ///     std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1142    ///     6669,
1143    /// ));
1144    ///
1145    /// // Finally, spawn the server.
1146    /// tokio::spawn(serve);
1147    /// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
1148    /// # drop(console_layer);
1149    /// # let mut aggregator_handle = aggregator_handle;
1150    /// # aggregator_handle.abort();
1151    /// # });
1152    /// ```
1153    ///
1154    /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1155    pub fn into_parts(mut self) -> ServerParts {
1156        let aggregator = self
1157            .aggregator
1158            .take()
1159            .expect("cannot start server multiple times");
1160
1161        let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self);
1162
1163        ServerParts {
1164            instrument_server,
1165            aggregator,
1166        }
1167    }
1168}
1169
1170/// Server Parts
1171///
1172/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
1173/// further parts in the future, an as such is marked as `non_exhaustive`.
1174///
1175/// The `InstrumentServer<Server>` can be used to construct a router which
1176/// can be added to a [`tonic`] gRPC server.
1177///
1178/// The `aggregator` is a future which should be running as long as the server is.
1179/// Generally, this future should be spawned onto an appropriate runtime and then
1180/// aborted if the server gets shut down.
1181///
1182/// See the [`Server::into_parts`] documentation for usage.
1183#[non_exhaustive]
1184pub struct ServerParts {
1185    /// The instrument server.
1186    ///
1187    /// See the documentation for [`InstrumentServer`] for details.
1188    pub instrument_server: InstrumentServer<Server>,
1189
1190    /// The aggregator.
1191    ///
1192    /// Responsible for collecting and preparing traces for the instrument server
1193    /// to send its clients.
1194    ///
1195    /// The aggregator should be [`run`] when the instrument server is started.
1196    /// If the server stops running for any reason, the aggregator task can be
1197    /// aborted.
1198    ///
1199    /// [`run`]: fn@crate::Aggregator::run
1200    pub aggregator: Aggregator,
1201}
1202
1203#[tonic::async_trait]
1204impl proto::instrument::instrument_server::Instrument for Server {
1205    type WatchUpdatesStream =
1206        tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::Update, tonic::Status>>;
1207    type WatchTaskDetailsStream =
1208        tokio_stream::wrappers::ReceiverStream<Result<proto::tasks::TaskDetails, tonic::Status>>;
1209    type WatchStateStream =
1210        tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::State, tonic::Status>>;
1211    async fn watch_updates(
1212        &self,
1213        req: tonic::Request<proto::instrument::InstrumentRequest>,
1214    ) -> Result<tonic::Response<Self::WatchUpdatesStream>, tonic::Status> {
1215        match req.remote_addr() {
1216            Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"),
1217            None => tracing::debug!(client.addr = %"<unknown>", "starting a new watch"),
1218        }
1219        let permit = self.subscribe.reserve().await.map_err(|_| {
1220            tonic::Status::internal("cannot start new watch, aggregation task is not running")
1221        })?;
1222        let (tx, rx) = mpsc::channel(self.client_buffer);
1223        permit.send(Command::Instrument(Watch(tx)));
1224        tracing::debug!("watch started");
1225        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1226        Ok(tonic::Response::new(stream))
1227    }
1228
1229    async fn watch_task_details(
1230        &self,
1231        req: tonic::Request<proto::instrument::TaskDetailsRequest>,
1232    ) -> Result<tonic::Response<Self::WatchTaskDetailsStream>, tonic::Status> {
1233        let task_id = req
1234            .into_inner()
1235            .id
1236            .ok_or_else(|| tonic::Status::invalid_argument("missing task_id"))?
1237            .id;
1238
1239        // `tracing` reserves span ID 0 for niche optimization for `Option<Id>`.
1240        let id = std::num::NonZeroU64::new(task_id)
1241            .map(Id::from_non_zero_u64)
1242            .ok_or_else(|| tonic::Status::invalid_argument("task_id cannot be 0"))?;
1243
1244        let permit = self.subscribe.reserve().await.map_err(|_| {
1245            tonic::Status::internal("cannot start new watch, aggregation task is not running")
1246        })?;
1247
1248        // Check with the aggregator task to request a stream if the task exists.
1249        let (stream_sender, stream_recv) = oneshot::channel();
1250        permit.send(Command::WatchTaskDetail(WatchRequest {
1251            id,
1252            stream_sender,
1253            buffer: self.client_buffer,
1254        }));
1255        // If the aggregator drops the sender, the task doesn't exist.
1256        let rx = stream_recv.await.map_err(|_| {
1257            tracing::warn!(id = ?task_id, "requested task not found");
1258            tonic::Status::not_found("task not found")
1259        })?;
1260
1261        tracing::debug!(id = ?task_id, "task details watch started");
1262        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1263        Ok(tonic::Response::new(stream))
1264    }
1265
1266    async fn watch_state(
1267        &self,
1268        _req: tonic::Request<proto::instrument::StateRequest>,
1269    ) -> Result<tonic::Response<Self::WatchStateStream>, tonic::Status> {
1270        let (stream_sender, stream_recv) = mpsc::channel(self.client_buffer);
1271        self.subscribe
1272            .send(Command::WatchState(Watch(stream_sender)))
1273            .await
1274            .map_err(|_| {
1275                tonic::Status::internal("cannot get state, aggregation task is not running")
1276            })?;
1277        let stream = tokio_stream::wrappers::ReceiverStream::new(stream_recv);
1278        Ok(tonic::Response::new(stream))
1279    }
1280
1281    async fn pause(
1282        &self,
1283        _req: tonic::Request<proto::instrument::PauseRequest>,
1284    ) -> Result<tonic::Response<proto::instrument::PauseResponse>, tonic::Status> {
1285        self.subscribe.send(Command::Pause).await.map_err(|_| {
1286            tonic::Status::internal("cannot pause, aggregation task is not running")
1287        })?;
1288        Ok(tonic::Response::new(proto::instrument::PauseResponse {}))
1289    }
1290
1291    async fn resume(
1292        &self,
1293        _req: tonic::Request<proto::instrument::ResumeRequest>,
1294    ) -> Result<tonic::Response<proto::instrument::ResumeResponse>, tonic::Status> {
1295        self.subscribe.send(Command::Resume).await.map_err(|_| {
1296            tonic::Status::internal("cannot resume, aggregation task is not running")
1297        })?;
1298        Ok(tonic::Response::new(proto::instrument::ResumeResponse {}))
1299    }
1300}
1301
1302impl WakeOp {
1303    /// Returns `true` if `self` is a `Wake` or `WakeByRef` event.
1304    fn is_wake(self) -> bool {
1305        matches!(self, Self::Wake { .. } | Self::WakeByRef { .. })
1306    }
1307
1308    fn self_wake(self, self_wake: bool) -> Self {
1309        match self {
1310            Self::Wake { .. } => Self::Wake { self_wake },
1311            Self::WakeByRef { .. } => Self::WakeByRef { self_wake },
1312            x => x,
1313        }
1314    }
1315}
1316
1317#[track_caller]
1318pub(crate) fn spawn_named<T>(
1319    task: impl std::future::Future<Output = T> + Send + 'static,
1320    _name: &str,
1321) -> tokio::task::JoinHandle<T>
1322where
1323    T: Send + 'static,
1324{
1325    #[cfg(tokio_unstable)]
1326    return tokio::task::Builder::new().name(_name).spawn(task).unwrap();
1327
1328    #[cfg(not(tokio_unstable))]
1329    tokio::spawn(task)
1330}