Index: openacs-4/packages/xotcl-core/tcl/bgdelivery-procs.tcl
===================================================================
RCS file: /usr/local/cvsroot/openacs-4/packages/xotcl-core/tcl/bgdelivery-procs.tcl,v
diff -u -r1.45 -r1.46
--- openacs-4/packages/xotcl-core/tcl/bgdelivery-procs.tcl 17 Feb 2013 11:48:06 -0000 1.45
+++ openacs-4/packages/xotcl-core/tcl/bgdelivery-procs.tcl 21 Feb 2013 12:31:33 -0000 1.46
@@ -288,35 +288,86 @@
}
}
- Subscriber proc broadcast {key msg} {
+ Subscriber instproc close {} {
+ set channel [my channel]
+ #
+ # It is important to make the channel non-blocking for the close,
+ # since otherwise the close operation might block and bring all of
+ # bgdelivery to a halt.
+ #
+ catch {fconfigure $channel -blocking false}
+ catch {close $channel}
+ }
+
+ Subscriber instproc sweep {args} {
+ #
+ # when the sweeper raises an error the caller (foreachSubscriber)
+ # destroys the instance. In this step the peer connection is close
+ # as well.
+ #
+ set channel [my channel]
+ if {[catch {set eof [eof $channel]}]} {set eof 1}
+ my log "sweep [my channel] EOF $eof"
+ if {$eof} {
+ error "connection $channel closed by peer"
+ }
+ # make an io-attempt to trigger EOF
+ if {[catch {
+ set blocking [fconfigure $channel -blocking]
+ fconfigure $channel -blocking false
+ set x [read $channel]
+ fconfigure $channel -blocking $blocking
+ } errorMsg]} {
+ error "connection $channel closed due to IO error"
+ }
+ }
+
+ Subscriber instproc send {msg} {
+ my log ""
+ if {[my mode] eq "scripted"} {
+ set smsg "\n"
+ set smsg [format %x [string length $smsg]]\r\n$smsg\r\n
+ } else {
+ set smsg $msg
+ }
+ #my log "-- sending to subscriber for [my key] $smsg ch=[my channel] \
+ # mode=[my mode], user_id [my user_id]"
+ puts -nonewline [my channel] $smsg
+ flush [my channel]
+ }
+
+ Subscriber proc foreachSubscriber {key method {argument ""}} {
+ my msg "$key $method '$argument'"
my instvar subscriptions
if {[info exists subscriptions($key)]} {
set subs1 [list]
foreach s $subscriptions($key) {
- if {[catch {
- if {[$s mode] eq "scripted"} {
- set smsg "\n"
- set smsg [format %x [string length $smsg]]\r\n$smsg\r\n
- } else {
- set smsg $msg
- }
- my log "-- sending to subscriber for $key $smsg ch=[$s channel] \
- mode=[$s mode], user_id [$s user_id]"
- puts -nonewline [$s channel] $smsg
- flush [$s channel]
- } errmsg]} {
- ns_log notice "error in send to subscriber (key=$key): $errmsg"
- catch {close [$s channel]}
+ if {[catch {$s $method $argument} errMsg]} {
+ ns_log notice "error in $method to subscriber $s (key=$key): $errMsg"
$s destroy
} else {
lappend subs1 $s
}
}
set subscriptions($key) $subs1
}
+ }
+
+ Subscriber proc broadcast {key msg} {
+ my foreachSubscriber $key send $msg
incr ::message_count
}
+
+ Subscriber proc sweep {key} {
+ my foreachSubscriber $key sweep
+ }
+
+ Subscriber instproc destroy {} {
+ my close
+ next
+ }
+
Subscriber instproc init {} {
[my info class] instvar subscriptions
lappend subscriptions([my key]) [self]
@@ -647,6 +698,7 @@
bgdelivery proc subscribe {key {initmsg ""} {mode default} } {
set ch [ns_conn channel]
thread::transfer [my get_tid] $ch
+ #my do ::Subscriber sweep $key
my do ::Subscriber new -channel $ch -key $key -user_id [ad_conn user_id] -mode $mode
}