1#![doc = include_str!("../README.md")]
2use console_api as proto;
3use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
4use serde::Serialize;
5#[cfg(feature = "vsock")]
6use tokio_vsock::VsockListener;
7
8use std::{
9 cell::RefCell,
10 fmt,
11 net::{IpAddr, Ipv4Addr},
12 sync::{
13 atomic::{AtomicUsize, Ordering},
14 Arc,
15 },
16 time::{Duration, Instant},
17};
18use thread_local::ThreadLocal;
19#[cfg(unix)]
20use tokio::net::UnixListener;
21use tokio::sync::{mpsc, oneshot};
22#[cfg(unix)]
23use tokio_stream::wrappers::UnixListenerStream;
24use tracing_core::{
25 span::{self, Id},
26 subscriber::{self, Subscriber},
27 Metadata,
28};
29use tracing_subscriber::{
30 layer::Context,
31 registry::{Extensions, LookupSpan},
32 Layer,
33};
34
35mod aggregator;
36mod attribute;
37mod builder;
38mod callsites;
39mod record;
40mod stack;
41mod stats;
42pub(crate) mod sync;
43mod visitors;
44
45pub use aggregator::Aggregator;
46pub use builder::{Builder, ServerAddr};
47use callsites::Callsites;
48use record::Recorder;
49use stack::SpanStack;
50use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor};
51
52pub use builder::{init, spawn};
53
54use crate::visitors::{PollOpVisitor, StateUpdateVisitor};
55
56pub struct ConsoleLayer {
67 current_spans: ThreadLocal<RefCell<SpanStack>>,
68 tx: mpsc::Sender<Event>,
69 shared: Arc<Shared>,
70 flush_under_capacity: usize,
73
74 spawn_callsites: Callsites<8>,
80
81 waker_callsites: Callsites<16>,
87
88 resource_callsites: Callsites<32>,
92
93 async_op_callsites: Callsites<32>,
97
98 async_op_poll_callsites: Callsites<32>,
102
103 poll_op_callsites: Callsites<32>,
107
108 resource_state_update_callsites: Callsites<32>,
112
113 async_op_state_update_callsites: Callsites<32>,
117
118 recorder: Option<Recorder>,
120
121 base_time: stats::TimeAnchor,
124
125 max_poll_duration_nanos: u64,
129
130 max_scheduled_duration_nanos: u64,
134}
135
136pub struct Server {
148 subscribe: mpsc::Sender<Command>,
149 addr: ServerAddr,
150 aggregator: Option<Aggregator>,
151 client_buffer: usize,
152}
153
154pub(crate) trait ToProto {
155 type Output;
156 fn to_proto(&self, base_time: &stats::TimeAnchor) -> Self::Output;
157}
158
159#[derive(Debug, Default)]
161struct Shared {
162 flush: aggregator::Flush,
165
166 dropped_tasks: AtomicUsize,
169
170 dropped_async_ops: AtomicUsize,
173
174 dropped_resources: AtomicUsize,
177}
178
179struct Watch<T>(mpsc::Sender<Result<T, tonic::Status>>);
180
181enum Command {
182 Instrument(Watch<proto::instrument::Update>),
183 WatchTaskDetail(WatchRequest<proto::tasks::TaskDetails>),
184 WatchState(Watch<proto::instrument::State>),
185 Pause,
186 Resume,
187}
188
189struct WatchRequest<T> {
190 id: Id,
191 stream_sender: oneshot::Sender<mpsc::Receiver<Result<T, tonic::Status>>>,
192 buffer: usize,
193}
194
195#[derive(Debug)]
196enum Event {
197 Metadata(&'static Metadata<'static>),
198 Spawn {
199 id: span::Id,
200 metadata: &'static Metadata<'static>,
201 stats: Arc<stats::TaskStats>,
202 fields: Vec<proto::Field>,
203 location: Option<proto::Location>,
204 },
205 Resource {
206 id: span::Id,
207 parent_id: Option<span::Id>,
208 metadata: &'static Metadata<'static>,
209 concrete_type: String,
210 kind: resource::Kind,
211 location: Option<proto::Location>,
212 is_internal: bool,
213 stats: Arc<stats::ResourceStats>,
214 },
215 PollOp {
216 metadata: &'static Metadata<'static>,
217 resource_id: span::Id,
218 op_name: String,
219 async_op_id: span::Id,
220 task_id: span::Id,
221 is_ready: bool,
222 },
223 AsyncResourceOp {
224 id: span::Id,
225 parent_id: Option<span::Id>,
226 resource_id: span::Id,
227 metadata: &'static Metadata<'static>,
228 source: String,
229
230 stats: Arc<stats::AsyncOpStats>,
231 },
232}
233
234#[derive(Clone, Debug, Copy, Serialize)]
235enum WakeOp {
236 Wake { self_wake: bool },
237 WakeByRef { self_wake: bool },
238 Clone,
239 Drop,
240}
241
242impl ConsoleLayer {
243 pub fn new() -> (Self, Server) {
250 Self::builder().build()
251 }
252
253 pub fn builder() -> Builder {
259 Builder::default()
260 }
261
262 fn build(config: Builder) -> (Self, Server) {
263 #![allow(clippy::assertions_on_constants)]
266 assert!(
267 cfg!(any(tokio_unstable, console_without_tokio_unstable)),
268 "task tracing requires Tokio to be built with RUSTFLAGS=\"--cfg tokio_unstable\"!"
269 );
270
271 let base_time = stats::TimeAnchor::new();
272 tracing::debug!(
273 config.event_buffer_capacity,
274 config.client_buffer_capacity,
275 ?config.publish_interval,
276 ?config.retention,
277 ?config.server_addr,
278 ?config.recording_path,
279 ?config.filter_env_var,
280 ?config.poll_duration_max,
281 ?config.scheduled_duration_max,
282 ?base_time,
283 "configured console subscriber"
284 );
285
286 let (tx, events) = mpsc::channel(config.event_buffer_capacity);
287 let (subscribe, rpcs) = mpsc::channel(256);
288 let shared = Arc::new(Shared::default());
289 let aggregator = Aggregator::new(events, rpcs, &config, shared.clone(), base_time.clone());
290 let flush_under_capacity = config.event_buffer_capacity / 2;
293 let recorder = config
294 .recording_path
295 .as_ref()
296 .map(|path| Recorder::new(path).expect("creating recorder"));
297 let server = Server {
298 aggregator: Some(aggregator),
299 addr: config.server_addr,
300 subscribe,
301 client_buffer: config.client_buffer_capacity,
302 };
303 let layer = Self {
304 current_spans: ThreadLocal::new(),
305 tx,
306 shared,
307 flush_under_capacity,
308 spawn_callsites: Callsites::default(),
309 waker_callsites: Callsites::default(),
310 resource_callsites: Callsites::default(),
311 async_op_callsites: Callsites::default(),
312 async_op_poll_callsites: Callsites::default(),
313 poll_op_callsites: Callsites::default(),
314 resource_state_update_callsites: Callsites::default(),
315 async_op_state_update_callsites: Callsites::default(),
316 recorder,
317 base_time,
318 max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64,
319 max_scheduled_duration_nanos: config.scheduled_duration_max.as_nanos() as u64,
320 };
321 (layer, server)
322 }
323}
324
325impl ConsoleLayer {
326 pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 100;
335 pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4;
343
344 pub const DEFAULT_PUBLISH_INTERVAL: Duration = Duration::from_secs(1);
354
355 pub const DEFAULT_RETENTION: Duration = Duration::from_secs(60 * 60);
366
367 pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1);
374
375 pub const DEFAULT_SCHEDULED_DURATION_MAX: Duration = Duration::from_secs(1);
383
384 fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool {
385 self.spawn_callsites.contains(meta)
386 }
387
388 fn is_resource(&self, meta: &'static Metadata<'static>) -> bool {
389 self.resource_callsites.contains(meta)
390 }
391
392 fn is_async_op(&self, meta: &'static Metadata<'static>) -> bool {
393 self.async_op_callsites.contains(meta)
394 }
395
396 fn is_id_spawned<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
397 where
398 S: Subscriber + for<'a> LookupSpan<'a>,
399 {
400 cx.span(id)
401 .map(|span| self.is_spawn(span.metadata()))
402 .unwrap_or(false)
403 }
404
405 fn is_id_resource<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
406 where
407 S: Subscriber + for<'a> LookupSpan<'a>,
408 {
409 cx.span(id)
410 .map(|span| self.is_resource(span.metadata()))
411 .unwrap_or(false)
412 }
413
414 fn is_id_async_op<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
415 where
416 S: Subscriber + for<'a> LookupSpan<'a>,
417 {
418 cx.span(id)
419 .map(|span| self.is_async_op(span.metadata()))
420 .unwrap_or(false)
421 }
422
423 fn first_entered<P>(&self, stack: &SpanStack, p: P) -> Option<span::Id>
424 where
425 P: Fn(&span::Id) -> bool,
426 {
427 stack
428 .stack()
429 .iter()
430 .rev()
431 .find(|id| p(id.id()))
432 .map(|id| id.id())
433 .cloned()
434 }
435
436 fn send_metadata(&self, dropped: &AtomicUsize, event: Event) -> bool {
437 self.send_stats(dropped, move || (event, ())).is_some()
438 }
439
440 fn send_stats<S>(
441 &self,
442 dropped: &AtomicUsize,
443 mk_event: impl FnOnce() -> (Event, S),
444 ) -> Option<S> {
445 use mpsc::error::TrySendError;
446
447 let sent = match self.tx.try_reserve() {
449 Ok(permit) => {
450 let (event, stats) = mk_event();
451 permit.send(event);
452 Some(stats)
453 }
454 Err(TrySendError::Closed(_)) => {
455 None
458 }
459 Err(TrySendError::Full(_)) => {
460 dropped.fetch_add(1, Ordering::Release);
465 None
466 }
467 };
468
469 let capacity = self.tx.capacity();
470 if capacity <= self.flush_under_capacity {
471 self.shared.flush.trigger();
472 }
473
474 sent
475 }
476
477 fn record(&self, event: impl FnOnce() -> record::Event) {
478 if let Some(ref recorder) = self.recorder {
479 recorder.record(event());
480 }
481 }
482
483 fn state_update<S>(
484 &self,
485 id: &Id,
486 event: &tracing::Event<'_>,
487 ctx: &Context<'_, S>,
488 get_stats: impl for<'a> Fn(&'a Extensions) -> Option<&'a stats::ResourceStats>,
489 ) where
490 S: Subscriber + for<'a> LookupSpan<'a>,
491 {
492 let meta_id = event.metadata().into();
493 let mut state_update_visitor = StateUpdateVisitor::new(meta_id);
494 event.record(&mut state_update_visitor);
495
496 let update = match state_update_visitor.result() {
497 Some(update) => update,
498 None => return,
499 };
500
501 let span = match ctx.span(id) {
502 Some(span) => span,
503 None => return,
506 };
507
508 let exts = span.extensions();
509 let stats = match get_stats(&exts) {
510 Some(stats) => stats,
511 None => return,
513 };
514
515 stats.update_attribute(id, &update);
516
517 if let Some(parent) = stats.parent_id.as_ref().and_then(|parent| ctx.span(parent)) {
518 let exts = parent.extensions();
519 if let Some(stats) = get_stats(&exts) {
520 if stats.inherit_child_attributes {
521 stats.update_attribute(id, &update);
522 }
523 }
524 }
525 }
526}
527
528impl<S> Layer<S> for ConsoleLayer
529where
530 S: Subscriber + for<'a> LookupSpan<'a>,
531{
532 fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest {
533 if !meta.is_span() && !meta.is_event() {
534 return subscriber::Interest::never();
535 }
536
537 let dropped = match (meta.name(), meta.target()) {
538 ("runtime.spawn", _) | ("task", "tokio::task") => {
539 self.spawn_callsites.insert(meta);
540 &self.shared.dropped_tasks
541 }
542 (_, "runtime::waker") | (_, "tokio::task::waker") => {
543 self.waker_callsites.insert(meta);
544 &self.shared.dropped_tasks
545 }
546 (ResourceVisitor::RES_SPAN_NAME, _) => {
547 self.resource_callsites.insert(meta);
548 &self.shared.dropped_resources
549 }
550 (AsyncOpVisitor::ASYNC_OP_SPAN_NAME, _) => {
551 self.async_op_callsites.insert(meta);
552 &self.shared.dropped_async_ops
553 }
554 ("runtime.resource.async_op.poll", _) => {
555 self.async_op_poll_callsites.insert(meta);
556 &self.shared.dropped_async_ops
557 }
558 (_, PollOpVisitor::POLL_OP_EVENT_TARGET) => {
559 self.poll_op_callsites.insert(meta);
560 &self.shared.dropped_async_ops
561 }
562 (_, StateUpdateVisitor::RE_STATE_UPDATE_EVENT_TARGET) => {
563 self.resource_state_update_callsites.insert(meta);
564 &self.shared.dropped_resources
565 }
566 (_, StateUpdateVisitor::AO_STATE_UPDATE_EVENT_TARGET) => {
567 self.async_op_state_update_callsites.insert(meta);
568 &self.shared.dropped_async_ops
569 }
570 (_, _) => &self.shared.dropped_tasks,
571 };
572
573 self.send_metadata(dropped, Event::Metadata(meta));
574 subscriber::Interest::always()
575 }
576
577 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
578 let metadata = attrs.metadata();
579 if self.is_spawn(metadata) {
580 let at = Instant::now();
581 let mut task_visitor = TaskVisitor::new(metadata.into());
582 attrs.record(&mut task_visitor);
583 let (fields, location) = task_visitor.result();
584 self.record(|| record::Event::Spawn {
585 id: id.into_u64(),
586 at: self.base_time.to_system_time(at),
587 fields: record::SerializeFields(fields.clone()),
588 });
589 if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
590 let stats = Arc::new(stats::TaskStats::new(
591 self.max_poll_duration_nanos,
592 self.max_scheduled_duration_nanos,
593 at,
594 ));
595 let event = Event::Spawn {
596 id: id.clone(),
597 stats: stats.clone(),
598 metadata,
599 fields,
600 location,
601 };
602 (event, stats)
603 }) {
604 ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
605 }
606 return;
607 }
608
609 if self.is_resource(metadata) {
610 let at = Instant::now();
611 let mut resource_visitor = ResourceVisitor::default();
612 attrs.record(&mut resource_visitor);
613 if let Some(result) = resource_visitor.result() {
614 let ResourceVisitorResult {
615 concrete_type,
616 kind,
617 location,
618 is_internal,
619 inherit_child_attrs,
620 } = result;
621 let parent_id = self.current_spans.get().and_then(|stack| {
622 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
623 });
624 if let Some(stats) = self.send_stats(&self.shared.dropped_resources, move || {
625 let stats = Arc::new(stats::ResourceStats::new(
626 at,
627 inherit_child_attrs,
628 parent_id.clone(),
629 ));
630 let event = Event::Resource {
631 id: id.clone(),
632 parent_id,
633 metadata,
634 concrete_type,
635 kind,
636 location,
637 is_internal,
638 stats: stats.clone(),
639 };
640 (event, stats)
641 }) {
642 ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
643 }
644 }
645 return;
646 }
647
648 if self.is_async_op(metadata) {
649 let at = Instant::now();
650 let mut async_op_visitor = AsyncOpVisitor::default();
651 attrs.record(&mut async_op_visitor);
652 if let Some((source, inherit_child_attrs)) = async_op_visitor.result() {
653 let resource_id = self.current_spans.get().and_then(|stack| {
654 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
655 });
656
657 let parent_id = self.current_spans.get().and_then(|stack| {
658 self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
659 });
660
661 if let Some(resource_id) = resource_id {
662 if let Some(stats) =
663 self.send_stats(&self.shared.dropped_async_ops, move || {
664 let stats = Arc::new(stats::AsyncOpStats::new(
665 at,
666 inherit_child_attrs,
667 parent_id.clone(),
668 ));
669 let event = Event::AsyncResourceOp {
670 id: id.clone(),
671 parent_id,
672 resource_id,
673 metadata,
674 source,
675 stats: stats.clone(),
676 };
677 (event, stats)
678 })
679 {
680 ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
681 }
682 }
683 }
684 }
685 }
686
687 fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
688 let metadata = event.metadata();
689 if self.waker_callsites.contains(metadata) {
690 let at = Instant::now();
691 let mut visitor = WakerVisitor::default();
692 event.record(&mut visitor);
693 if let Some((id, mut op)) = visitor.result() {
695 if let Some(span) = ctx.span(&id) {
696 let exts = span.extensions();
697 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
698 if op.is_wake() {
699 let self_wake = self
703 .current_spans
704 .get()
705 .map(|spans| spans.borrow().iter().any(|span| span == &id))
706 .unwrap_or(false);
707 op = op.self_wake(self_wake);
708 }
709
710 stats.record_wake_op(op, at);
711 self.record(|| record::Event::Waker {
712 id: id.into_u64(),
713 at: self.base_time.to_system_time(at),
714 op,
715 });
716 }
717 }
718 }
719 return;
720 }
721
722 if self.poll_op_callsites.contains(metadata) {
723 let resource_id = self.current_spans.get().and_then(|stack| {
724 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
725 });
726 if let Some(resource_id) = resource_id {
728 let mut poll_op_visitor = PollOpVisitor::default();
729 event.record(&mut poll_op_visitor);
730 if let Some((op_name, is_ready)) = poll_op_visitor.result() {
731 let task_and_async_op_ids = self.current_spans.get().and_then(|stack| {
732 let stack = stack.borrow();
733 let task_id =
734 self.first_entered(&stack, |id| self.is_id_spawned(id, &ctx))?;
735 let async_op_id =
736 self.first_entered(&stack, |id| self.is_id_async_op(id, &ctx))?;
737 Some((task_id, async_op_id))
738 });
739 if let Some((task_id, async_op_id)) = task_and_async_op_ids {
741 if let Some(span) = ctx.span(&async_op_id) {
742 let exts = span.extensions();
743 if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
744 stats.set_task_id(&task_id);
745 }
746 }
747
748 self.send_stats(&self.shared.dropped_async_ops, || {
749 let event = Event::PollOp {
750 metadata,
751 op_name,
752 resource_id,
753 async_op_id,
754 task_id,
755 is_ready,
756 };
757 (event, ())
758 });
759
760 }
762 }
763 }
764 return;
765 }
766
767 if self.resource_state_update_callsites.contains(metadata) {
768 let resource_id = self.current_spans.get().and_then(|stack| {
770 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
771 });
772 if let Some(id) = resource_id {
773 self.state_update(&id, event, &ctx, |exts| {
774 exts.get::<Arc<stats::ResourceStats>>()
775 .map(<Arc<stats::ResourceStats> as std::ops::Deref>::deref)
776 });
777 }
778
779 return;
780 }
781
782 if self.async_op_state_update_callsites.contains(metadata) {
783 let async_op_id = self.current_spans.get().and_then(|stack| {
784 self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
785 });
786 if let Some(id) = async_op_id {
787 self.state_update(&id, event, &ctx, |exts| {
788 let async_op = exts.get::<Arc<stats::AsyncOpStats>>()?;
789 Some(&async_op.stats)
790 });
791 }
792 }
793 }
794
795 fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) {
796 if let Some(span) = cx.span(id) {
797 let now = Instant::now();
798 let exts = span.extensions();
799 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
802 stats.start_poll(now);
803 } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
804 stats.start_poll(now);
805 } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
806 } else {
810 return;
811 };
812
813 self.current_spans
814 .get_or_default()
815 .borrow_mut()
816 .push(id.clone());
817
818 self.record(|| record::Event::Enter {
819 id: id.into_u64(),
820 at: self.base_time.to_system_time(now),
821 });
822 }
823 }
824
825 fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) {
826 if let Some(span) = cx.span(id) {
827 let exts = span.extensions();
828 let now = Instant::now();
829 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
832 stats.end_poll(now);
833 } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
834 stats.end_poll(now);
835 } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
836 } else {
840 return;
841 };
842
843 self.current_spans.get_or_default().borrow_mut().pop(id);
844
845 self.record(|| record::Event::Exit {
846 id: id.into_u64(),
847 at: self.base_time.to_system_time(now),
848 });
849 }
850 }
851
852 fn on_close(&self, id: span::Id, cx: Context<'_, S>) {
853 if let Some(span) = cx.span(&id) {
854 let now = Instant::now();
855 let exts = span.extensions();
856 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
857 stats.drop_task(now);
858 } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
859 stats.drop_async_op(now);
860 } else if let Some(stats) = exts.get::<Arc<stats::ResourceStats>>() {
861 stats.drop_resource(now);
862 }
863 self.record(|| record::Event::Close {
864 id: id.into_u64(),
865 at: self.base_time.to_system_time(now),
866 });
867 }
868 }
869}
870
871impl fmt::Debug for ConsoleLayer {
872 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873 f.debug_struct("ConsoleLayer")
874 .field("tx", &format_args!("<...>"))
876 .field("tx.capacity", &self.tx.capacity())
877 .field("shared", &self.shared)
878 .field("spawn_callsites", &self.spawn_callsites)
879 .field("waker_callsites", &self.waker_callsites)
880 .finish()
881 }
882}
883
884impl Server {
885 pub const DEFAULT_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
898
899 pub const DEFAULT_PORT: u16 = 6669;
911
912 pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
926 self.serve_with(tonic::transport::Server::default()).await
927 }
928
929 pub async fn serve_with(
940 self,
941 mut builder: tonic::transport::Server,
942 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
943 let addr = self.addr.clone();
944 let ServerParts {
945 instrument_server,
946 aggregator,
947 } = self.into_parts();
948 let aggregate = spawn_named(aggregator.run(), "console::aggregate");
949 let router = builder.add_service(instrument_server);
950 let res = match addr {
951 ServerAddr::Tcp(addr) => {
952 let serve = router.serve(addr);
953 spawn_named(serve, "console::serve").await
954 }
955 #[cfg(unix)]
956 ServerAddr::Unix(path) => {
957 let incoming = UnixListener::bind(path)?;
958 let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
959 spawn_named(serve, "console::serve").await
960 }
961 #[cfg(feature = "vsock")]
962 ServerAddr::Vsock(addr) => {
963 let incoming = VsockListener::bind(addr)?.incoming();
964 let serve = router.serve_with_incoming(incoming);
965 spawn_named(serve, "console::serve").await
966 }
967 };
968 aggregate.abort();
969 res?.map_err(Into::into)
970 }
971
972 #[cfg(feature = "grpc-web")]
1070 pub async fn serve_with_grpc_web(
1071 self,
1072 builder: tonic::transport::Server,
1073 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
1074 let addr = self.addr.clone();
1075 let ServerParts {
1076 instrument_server,
1077 aggregator,
1078 } = self.into_parts();
1079 let router = builder
1080 .accept_http1(true)
1081 .add_service(tonic_web::enable(instrument_server));
1082 let aggregate = spawn_named(aggregator.run(), "console::aggregate");
1083 let res = match addr {
1084 ServerAddr::Tcp(addr) => {
1085 let serve = router.serve(addr);
1086 spawn_named(serve, "console::serve").await
1087 }
1088 #[cfg(unix)]
1089 ServerAddr::Unix(path) => {
1090 let incoming = UnixListener::bind(path)?;
1091 let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
1092 spawn_named(serve, "console::serve").await
1093 }
1094 #[cfg(feature = "vsock")]
1095 ServerAddr::Vsock(addr) => {
1096 let incoming = VsockListener::bind(addr)?.incoming();
1097 let serve = router.serve_with_incoming(incoming);
1098 spawn_named(serve, "console::serve").await
1099 }
1100 };
1101 aggregate.abort();
1102 res?.map_err(Into::into)
1103 }
1104
1105 pub fn into_parts(mut self) -> ServerParts {
1156 let aggregator = self
1157 .aggregator
1158 .take()
1159 .expect("cannot start server multiple times");
1160
1161 let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self);
1162
1163 ServerParts {
1164 instrument_server,
1165 aggregator,
1166 }
1167 }
1168}
1169
1170#[non_exhaustive]
1184pub struct ServerParts {
1185 pub instrument_server: InstrumentServer<Server>,
1189
1190 pub aggregator: Aggregator,
1201}
1202
1203#[tonic::async_trait]
1204impl proto::instrument::instrument_server::Instrument for Server {
1205 type WatchUpdatesStream =
1206 tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::Update, tonic::Status>>;
1207 type WatchTaskDetailsStream =
1208 tokio_stream::wrappers::ReceiverStream<Result<proto::tasks::TaskDetails, tonic::Status>>;
1209 type WatchStateStream =
1210 tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::State, tonic::Status>>;
1211 async fn watch_updates(
1212 &self,
1213 req: tonic::Request<proto::instrument::InstrumentRequest>,
1214 ) -> Result<tonic::Response<Self::WatchUpdatesStream>, tonic::Status> {
1215 match req.remote_addr() {
1216 Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"),
1217 None => tracing::debug!(client.addr = %"<unknown>", "starting a new watch"),
1218 }
1219 let permit = self.subscribe.reserve().await.map_err(|_| {
1220 tonic::Status::internal("cannot start new watch, aggregation task is not running")
1221 })?;
1222 let (tx, rx) = mpsc::channel(self.client_buffer);
1223 permit.send(Command::Instrument(Watch(tx)));
1224 tracing::debug!("watch started");
1225 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1226 Ok(tonic::Response::new(stream))
1227 }
1228
1229 async fn watch_task_details(
1230 &self,
1231 req: tonic::Request<proto::instrument::TaskDetailsRequest>,
1232 ) -> Result<tonic::Response<Self::WatchTaskDetailsStream>, tonic::Status> {
1233 let task_id = req
1234 .into_inner()
1235 .id
1236 .ok_or_else(|| tonic::Status::invalid_argument("missing task_id"))?
1237 .id;
1238
1239 let id = std::num::NonZeroU64::new(task_id)
1241 .map(Id::from_non_zero_u64)
1242 .ok_or_else(|| tonic::Status::invalid_argument("task_id cannot be 0"))?;
1243
1244 let permit = self.subscribe.reserve().await.map_err(|_| {
1245 tonic::Status::internal("cannot start new watch, aggregation task is not running")
1246 })?;
1247
1248 let (stream_sender, stream_recv) = oneshot::channel();
1250 permit.send(Command::WatchTaskDetail(WatchRequest {
1251 id,
1252 stream_sender,
1253 buffer: self.client_buffer,
1254 }));
1255 let rx = stream_recv.await.map_err(|_| {
1257 tracing::warn!(id = ?task_id, "requested task not found");
1258 tonic::Status::not_found("task not found")
1259 })?;
1260
1261 tracing::debug!(id = ?task_id, "task details watch started");
1262 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1263 Ok(tonic::Response::new(stream))
1264 }
1265
1266 async fn watch_state(
1267 &self,
1268 _req: tonic::Request<proto::instrument::StateRequest>,
1269 ) -> Result<tonic::Response<Self::WatchStateStream>, tonic::Status> {
1270 let (stream_sender, stream_recv) = mpsc::channel(self.client_buffer);
1271 self.subscribe
1272 .send(Command::WatchState(Watch(stream_sender)))
1273 .await
1274 .map_err(|_| {
1275 tonic::Status::internal("cannot get state, aggregation task is not running")
1276 })?;
1277 let stream = tokio_stream::wrappers::ReceiverStream::new(stream_recv);
1278 Ok(tonic::Response::new(stream))
1279 }
1280
1281 async fn pause(
1282 &self,
1283 _req: tonic::Request<proto::instrument::PauseRequest>,
1284 ) -> Result<tonic::Response<proto::instrument::PauseResponse>, tonic::Status> {
1285 self.subscribe.send(Command::Pause).await.map_err(|_| {
1286 tonic::Status::internal("cannot pause, aggregation task is not running")
1287 })?;
1288 Ok(tonic::Response::new(proto::instrument::PauseResponse {}))
1289 }
1290
1291 async fn resume(
1292 &self,
1293 _req: tonic::Request<proto::instrument::ResumeRequest>,
1294 ) -> Result<tonic::Response<proto::instrument::ResumeResponse>, tonic::Status> {
1295 self.subscribe.send(Command::Resume).await.map_err(|_| {
1296 tonic::Status::internal("cannot resume, aggregation task is not running")
1297 })?;
1298 Ok(tonic::Response::new(proto::instrument::ResumeResponse {}))
1299 }
1300}
1301
1302impl WakeOp {
1303 fn is_wake(self) -> bool {
1305 matches!(self, Self::Wake { .. } | Self::WakeByRef { .. })
1306 }
1307
1308 fn self_wake(self, self_wake: bool) -> Self {
1309 match self {
1310 Self::Wake { .. } => Self::Wake { self_wake },
1311 Self::WakeByRef { .. } => Self::WakeByRef { self_wake },
1312 x => x,
1313 }
1314 }
1315}
1316
1317#[track_caller]
1318pub(crate) fn spawn_named<T>(
1319 task: impl std::future::Future<Output = T> + Send + 'static,
1320 _name: &str,
1321) -> tokio::task::JoinHandle<T>
1322where
1323 T: Send + 'static,
1324{
1325 #[cfg(tokio_unstable)]
1326 return tokio::task::Builder::new().name(_name).spawn(task).unwrap();
1327
1328 #[cfg(not(tokio_unstable))]
1329 tokio::spawn(task)
1330}