Index: openacs-4/packages/xotcl-core/tcl/bgdelivery-procs.tcl =================================================================== RCS file: /usr/local/cvsroot/openacs-4/packages/xotcl-core/tcl/bgdelivery-procs.tcl,v diff -u -r1.45 -r1.46 --- openacs-4/packages/xotcl-core/tcl/bgdelivery-procs.tcl 17 Feb 2013 11:48:06 -0000 1.45 +++ openacs-4/packages/xotcl-core/tcl/bgdelivery-procs.tcl 21 Feb 2013 12:31:33 -0000 1.46 @@ -288,35 +288,86 @@ } } - Subscriber proc broadcast {key msg} { + Subscriber instproc close {} { + set channel [my channel] + # + # It is important to make the channel non-blocking for the close, + # since otherwise the close operation might block and bring all of + # bgdelivery to a halt. + # + catch {fconfigure $channel -blocking false} + catch {close $channel} + } + + Subscriber instproc sweep {args} { + # + # when the sweeper raises an error the caller (foreachSubscriber) + # destroys the instance. In this step the peer connection is close + # as well. + # + set channel [my channel] + if {[catch {set eof [eof $channel]}]} {set eof 1} + my log "sweep [my channel] EOF $eof" + if {$eof} { + error "connection $channel closed by peer" + } + # make an io-attempt to trigger EOF + if {[catch { + set blocking [fconfigure $channel -blocking] + fconfigure $channel -blocking false + set x [read $channel] + fconfigure $channel -blocking $blocking + } errorMsg]} { + error "connection $channel closed due to IO error" + } + } + + Subscriber instproc send {msg} { + my log "" + if {[my mode] eq "scripted"} { + set smsg "\n" + set smsg [format %x [string length $smsg]]\r\n$smsg\r\n + } else { + set smsg $msg + } + #my log "-- sending to subscriber for [my key] $smsg ch=[my channel] \ + # mode=[my mode], user_id [my user_id]" + puts -nonewline [my channel] $smsg + flush [my channel] + } + + Subscriber proc foreachSubscriber {key method {argument ""}} { + my msg "$key $method '$argument'" my instvar subscriptions if {[info exists subscriptions($key)]} { set subs1 [list] foreach s $subscriptions($key) { - if {[catch { - if {[$s mode] eq "scripted"} { - set smsg "\n" - set smsg [format %x [string length $smsg]]\r\n$smsg\r\n - } else { - set smsg $msg - } - my log "-- sending to subscriber for $key $smsg ch=[$s channel] \ - mode=[$s mode], user_id [$s user_id]" - puts -nonewline [$s channel] $smsg - flush [$s channel] - } errmsg]} { - ns_log notice "error in send to subscriber (key=$key): $errmsg" - catch {close [$s channel]} + if {[catch {$s $method $argument} errMsg]} { + ns_log notice "error in $method to subscriber $s (key=$key): $errMsg" $s destroy } else { lappend subs1 $s } } set subscriptions($key) $subs1 } + } + + Subscriber proc broadcast {key msg} { + my foreachSubscriber $key send $msg incr ::message_count } + + Subscriber proc sweep {key} { + my foreachSubscriber $key sweep + } + + Subscriber instproc destroy {} { + my close + next + } + Subscriber instproc init {} { [my info class] instvar subscriptions lappend subscriptions([my key]) [self] @@ -647,6 +698,7 @@ bgdelivery proc subscribe {key {initmsg ""} {mode default} } { set ch [ns_conn channel] thread::transfer [my get_tid] $ch + #my do ::Subscriber sweep $key my do ::Subscriber new -channel $ch -key $key -user_id [ad_conn user_id] -mode $mode }