Interesting approach!!! Some questions and comments interspersed.
Thanx, Paul
Good approach! Will steal it. ;-)
Interesting approach! My main concern would be that this might extend
grace periods (which has come up with preemptable RCU). Or do you
have some clever way of overlapping the required processing for the
various states?
How do you handle the uncertainty as to when a given state begins?
Here is an example sequence of events that I would be worried about:
o CPU 0 notices the end of a grace period, so updates the state.
o CPU 1 notices the new grace period while in a quiescent state.
It checks into the RCU state machine.
o CPU 1 starts a long-running RCU read-side critical section.
o CPU 2 deletes one of the elements that CPU 1 is referencing,
and registers an RCU callback to free it after a grace period.
o CPU 2 notices that a new grace period has commenced.
o The remaining CPUs (other than CPU 1, which already passed
through a quiescent state) pass through a quiescent state, ending
the grace period. CPU 1 remains in its RCU read-side critical
section.
o The RCU grace period ends, permitting CPU 2 to free the element
that it removed -- but which CPU 1 is still referencing.
This scenario used to be handled by an arcane and confusing combination of
flags and queues. Jiangshan recently unified this into another stage of
queuing, which seems to work very well -- and much more straightforwardly.
It is possible that your state machine handles this, but if so, it was not
obvious to me.
The approach preemptable RCU uses to interact with dynticks should
handle this. You mentioned using atomic operations previously, which
might simplify the code (Steve and I were concerned that use of atomic
ops in the interrupt path would get an automatic NACK, but it is quite
possible that we were being too paranoid).
People are apparently looking at 4096 CPUs these days, FWIW. I don't
see any architectural limit in your code, so just FYI.
This could be made to work, but the advantage of preemptable RCU's
upcounter approach is the ability to count momentarily dropping into
dyntick idle mode as a quiescent state -- even if we don't happen to
look at that CPU while it is actually residing in dyntick idle mode.
quoted text > - set_need_resched();
> - spin_lock_irqsave(&rcp->lock, flags);
> - if (unlikely(!rcp->signaled)) {
> - rcp->signaled = 1;
> - /*
> - * Don't send IPI to itself. With irqs disabled,
> - * rdp->cpu is the current cpu.
> - *
> - * cpu_online_map is updated by the _cpu_down()
> - * using __stop_machine(). Since we're in irqs disabled
> - * section, __stop_machine() is not exectuting, hence
> - * the cpu_online_map is stable.
> - *
> - * However, a cpu might have been offlined _just_ before
> - * we disabled irqs while entering here.
> - * And rcu subsystem might not yet have handled the CPU_DEAD
> - * notification, leading to the offlined cpu's bit
> - * being set in the rcp->cpumask.
> - *
> - * Hence cpumask = (rcp->cpumask & cpu_online_map) to prevent
> - * sending smp_reschedule() to an offlined CPU.
> - */
> - cpus_and(cpumask, rcp->cpumask, cpu_online_map);
> - cpu_clear(rdp->cpu, cpumask);
> - for_each_cpu_mask_nr(cpu, cpumask)
> - smp_send_reschedule(cpu);
> - }
> - spin_unlock_irqrestore(&rcp->lock, flags);
> +#define RCU_CPUMODE_INVALID -2
> +#define RCU_CPUMODE_DELAYED -1
> +DEFINE_PER_CPU(int, rcu_cpumode) = { 0L };
> +
> +int qlowmark = 100;
> +
> +long rcu_batches_completed(void)
> +{
> + return rcu_global_state_normal.completed;
> }
> -#else
> -static inline void force_quiescent_state(struct rcu_data *rdp,
> - struct rcu_ctrlblk *rcp)
> +
> +long rcu_batches_completed_bh(void)
> {
> - set_need_resched();
> + return rcu_global_state_normal.completed;
> }
> -#endif
>
> -static void __call_rcu(struct rcu_head *head, struct rcu_ctrlblk *rcp,
> - struct rcu_data *rdp)
> +/**
> + * rcu_state_startcycle - start the next rcu cycle
> + * @rgs: global rcu state
> + *
> + * The function starts the next rcu cycle, either immediately or
> + * by setting rgs->start_immediately.
> + */
> +static void rcu_state_startcycle(struct rcu_global_state *rgs)
> +{
> + unsigned seq;
> + int do_real_start;
> +
> + BUG_ON(!irqs_disabled());
> + do {
> + seq = read_seqbegin(&rgs->lock);
> + if (rgs->start_immediately == 0) {
> + do_real_start = 1;
> + } else {
> + do_real_start = 0;
> + BUG_ON(rcu_cpumask_getstate(&rgs->cpus) == RCU_STATE_DESTROY);
> + }
> + } while (read_seqretry(&rgs->lock, seq));
> +
> + if (do_real_start) {
> + write_seqlock(&rgs->lock);
> + switch(rcu_cpumask_getstate(&rgs->cpus)) {
> + case RCU_STATE_DESTROY_AND_COLLECT:
> + case RCU_STATE_GRACE:
> + rgs->start_immediately = 1;
> + break;
> + case RCU_STATE_DESTROY:
> + rcu_cpumask_init(&rgs->cpus, RCU_STATE_DESTROY_AND_COLLECT, 1);
> + smp_wmb();
> + BUG_ON(rgs->start_immediately);
> + break;
> + default:
> + BUG();
> + }
> + write_sequnlock(&rgs->lock);
> + }
> +}
> +
> +/*
> + * Delay that can occur for synchronize_rcu() callers
> + */
> +#define RCU_MAX_DELAY (HZ/30+1)
> +
> +static void rcu_checkqlen(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, int inc)
> {
> - long batch;
> + BUG_ON(!irqs_disabled());
> + if (unlikely(rcs->newqlen == 0)) {
> + rcs->timeout = jiffies + RCU_MAX_DELAY;
> + }
> + if ((rcs->newqlen < qlowmark) && (rcs->newqlen+inc >= qlowmark))
> + rcu_state_startcycle(rgs);
>
> - head->next = NULL;
> - smp_mb(); /* Read of rcu->cur must happen after any change by caller. */
> + rcs->newqlen += inc;
>
> /*
> - * Determine the batch number of this callback.
> - *
> - * Using ACCESS_ONCE to avoid the following error when gcc eliminates
> - * local variable "batch" and emits codes like this:
> - * 1) rdp->batch = rcp->cur + 1 # gets old value
> - * ......
> - * 2)rcu_batch_after(rcp->cur + 1, rdp->batch) # gets new value
> - * then [*nxttail[0], *nxttail[1]) may contain callbacks
> - * that batch# = rdp->batch, see the comment of struct rcu_data.
> + * This is not really a bug, it might happen when interrupt calls
> + * call_rcu() while the cpu is in nohz mode. see rcu_irq_exit
> */
> - batch = ACCESS_ONCE(rcp->cur) + 1;
> -
> - if (rdp->nxtlist && rcu_batch_after(batch, rdp->batch)) {
> - /* process callbacks */
> - rdp->nxttail[0] = rdp->nxttail[1];
> - rdp->nxttail[1] = rdp->nxttail[2];
> - if (rcu_batch_after(batch - 1, rdp->batch))
> - rdp->nxttail[0] = rdp->nxttail[2];
> - }
> + WARN_ON( (rcs->newqlen >= qlowmark) && (rcu_cpumask_getstate(&rgs->cpus) == RCU_STATE_DESTROY));
> +}
>
> - rdp->batch = batch;
> - *rdp->nxttail[2] = head;
> - rdp->nxttail[2] = &head->next;
>
> - if (unlikely(++rdp->qlen > qhimark)) {
> - rdp->blimit = INT_MAX;
> - force_quiescent_state(rdp, &rcu_ctrlblk);
> +static void __call_rcu(struct rcu_head *head, struct rcu_global_state *rgs,
> + struct rcu_cpu_state *rcs)
> +{
> + if (rcs->new == NULL) {
> + rcs->new = head;
> + } else {
> + (*rcs->newtail) = head;
> }
> + rcs->newtail = &head->next;
> +
> + rcu_checkqlen(rgs, rcs, 1);
> }
>
> /**
> @@ -182,7 +205,7 @@ void call_rcu(struct rcu_head *head,
>
> head->func = func;
> local_irq_save(flags);
> - __call_rcu(head, &rcu_ctrlblk, &__get_cpu_var(rcu_data));
> + __call_rcu(head, &rcu_global_state_normal, &__get_cpu_var(rcu_cpudata_normal));
> local_irq_restore(flags);
> }
> EXPORT_SYMBOL_GPL(call_rcu);
> @@ -210,462 +233,367 @@ void call_rcu_bh(struct rcu_head *head,
>
> head->func = func;
> local_irq_save(flags);
> - __call_rcu(head, &rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> + __call_rcu(head, &rcu_global_state_bh, &__get_cpu_var(rcu_cpudata_bh));
> local_irq_restore(flags);
> }
> EXPORT_SYMBOL_GPL(call_rcu_bh);
>
> -/*
> - * Return the number of RCU batches processed thus far. Useful
> - * for debug and statistics.
> - */
> -long rcu_batches_completed(void)
> -{
> - return rcu_ctrlblk.completed;
> -}
> -EXPORT_SYMBOL_GPL(rcu_batches_completed);
> -
> -/*
> - * Return the number of RCU batches processed thus far. Useful
> - * for debug and statistics.
> - */
> -long rcu_batches_completed_bh(void)
> -{
> - return rcu_bh_ctrlblk.completed;
> -}
> -EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
> +#define RCU_BATCH_MIN 100
> +#define RCU_BATCH_INCFACTOR 2
> +#define RCU_BATCH_DECFACTOR 4
>
> -/* Raises the softirq for processing rcu_callbacks. */
> -static inline void raise_rcu_softirq(void)
> +static void rcu_move_and_raise(struct rcu_cpu_state *rcs, int do_raise)
> {
> - raise_softirq(RCU_SOFTIRQ);
> -}
> + struct rcu_cpu_dead *rcd = &get_cpu_var(rcu_cpudata_dead);
>
> -/*
> - * Invoke the completed RCU callbacks. They are expected to be in
> - * a per-cpu list.
> - */
> -static void rcu_do_batch(struct rcu_data *rdp)
> -{
> - struct rcu_head *next, *list;
> - int count = 0;
> + BUG_ON(!irqs_disabled());
>
> - list = rdp->donelist;
> - while (list) {
> - next = list->next;
> - prefetch(next);
> - list->func(list);
> - list = next;
> - if (++count >= rdp->blimit)
> - break;
> + /* update batch limit:
> + * - if there are still old entries when new entries are added:
> + * double the batch count.
> + * - if there are no old entries: reduce it by 25%, but never below 100.
> + */
> + if (rcd->deadqlen)
> + rcd->batchcount = rcd->batchcount*RCU_BATCH_INCFACTOR;
> + else
> + rcd->batchcount = rcd->batchcount-rcd->batchcount/RCU_BATCH_DECFACTOR;
> + if (rcd->batchcount < RCU_BATCH_MIN)
> + rcd->batchcount = RCU_BATCH_MIN;
> +
> + if (rcs->old != NULL) {
> + if (rcd->dead == NULL) {
> + rcd->dead = rcs->old;
> + } else {
> + (*rcd->deadtail) = rcs->old;
> + }
> + rcd->deadtail = rcs->oldtail;
> + rcd->deadqlen += rcs->oldqlen;
> }
> - rdp->donelist = list;
>
> - local_irq_disable();
> - rdp->qlen -= count;
> - local_irq_enable();
> - if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
> - rdp->blimit = blimit;
> + rcs->old = NULL;
> + rcs->oldtail = NULL;
> + rcs->oldqlen = 0;
>
> - if (!rdp->donelist)
> - rdp->donetail = &rdp->donelist;
> - else
> - raise_rcu_softirq();
> -}
> -
> -/*
> - * Grace period handling:
> - * The grace period handling consists out of two steps:
> - * - A new grace period is started.
> - * This is done by rcu_start_batch. The start is not broadcasted to
> - * all cpus, they must pick this up by comparing rcp->cur with
> - * rdp->quiescbatch. All cpus are recorded in the
> - * rcu_ctrlblk.cpumask bitmap.
> - * - All cpus must go through a quiescent state.
> - * Since the start of the grace period is not broadcasted, at least two
> - * calls to rcu_check_quiescent_state are required:
> - * The first call just notices that a new grace period is running. The
> - * following calls check if there was a quiescent state since the beginning
> - * of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
> - * the bitmap is empty, then the grace period is completed.
> - * rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
> - * period (if necessary).
> - */
> + if (do_raise)
> + raise_softirq(RCU_SOFTIRQ);
>
> -#ifdef CONFIG_DEBUG_RCU_STALL
> -
> -static inline void record_gp_check_time(struct rcu_ctrlblk *rcp)
> -{
> - rcp->gp_check = get_seconds() + 3;
> + put_cpu_var(rcu_cpudata_dead);
> }
>
> -static void print_other_cpu_stall(struct rcu_ctrlblk *rcp)
> +static void __rcu_state_machine(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs,
> + int global_state, int is_quiet, int do_raise, int cpu)
> {
> - int cpu;
> - long delta;
> + int inc_state;
> unsigned long flags;
>
> - /* Only let one CPU complain about others per time interval. */
> -
> - spin_lock_irqsave(&rcp->lock, flags);
> - delta = get_seconds() - rcp->gp_check;
> - if (delta < 2L || cpus_empty(rcp->cpumask)) {
> - spin_unlock(&rcp->lock);
> - return;
> + /*
> + * Theoretically, this code should run under read_seqbegin().
> + * But: important chages (i.e. from COLLECT to GRACE,
> + * from GRACE to DESTROY) only happen when all cpus have completed
> + * their work. If rcu_cpumask_getstate(&rgs->cpus) != rcs->state, then we haven't completed
> + * our work yet. Thus such a change cannot happen.
> + * The only change that might happen is a change from RCU_STATE_DESTROY
> + * to RCU_STATE_DESTROY_AND_COLLECT. We'll notice that in the next
> + * round.
> + * no need for an mb() either - it simply doesn't matter.
> + * Actually: when rcu_state_startcycle() is called, then it's guaranteed
> + * that global_state and rcu_cpumask_getstate(&rgs->cpus) do not match...
> + */
> + local_irq_save(flags);
> + if (global_state == RCU_STATE_DESTROY && rcs->newqlen > 0 &&
> + time_after(jiffies, rcs->timeout) && do_raise) {
> +printk(KERN_ERR" delayed rcu start for %p: %ld entries (cpu %d, ptr %p).\n", rgs, rcs->newqlen, cpu, rcs);
> + rcu_state_startcycle(rgs);
> }
> - rcp->gp_check = get_seconds() + 30;
> - spin_unlock_irqrestore(&rcp->lock, flags);
> -
> - /* OK, time to rat on our buddy... */
> -
> - printk(KERN_ERR "RCU detected CPU stalls:");
> - for_each_cpu_mask(cpu, rcp->cpumask)
> - printk(" %d", cpu);
> - printk(" (detected by %d, t=%lu/%lu)\n",
> - smp_processor_id(), get_seconds(), rcp->gp_check);
> -}
> -
> -static void print_cpu_stall(struct rcu_ctrlblk *rcp)
> -{
> - unsigned long flags;
> -
> - printk(KERN_ERR "RCU detected CPU %d stall (t=%lu/%lu)\n",
> - smp_processor_id(), get_seconds(), rcp->gp_check);
> - dump_stack();
> - spin_lock_irqsave(&rcp->lock, flags);
> - if ((long)(get_seconds() - rcp->gp_check) >= 0L)
> - rcp->gp_check = get_seconds() + 30;
> - spin_unlock_irqrestore(&rcp->lock, flags);
> -}
> -
> -static void check_cpu_stall(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> -{
> - long delta;
> -
> - delta = get_seconds() - rcp->gp_check;
> - if (cpu_isset(smp_processor_id(), rcp->cpumask) && delta >= 0L) {
>
> - /* We haven't checked in, so go dump stack. */
> -
> - print_cpu_stall(rcp);
> -
> - } else {
> - if (!cpus_empty(rcp->cpumask) && delta >= 2L) {
> - /* They had two seconds to dump stack, so complain. */
> - print_other_cpu_stall(rcp);
> + inc_state = 0;
> + if (global_state != rcs->state) {
> + switch(global_state) {
> + case RCU_STATE_DESTROY:
> + rcs->state = RCU_STATE_DESTROY;
> + rcu_move_and_raise(rcs, do_raise);
> + break;
> + case RCU_STATE_DESTROY_AND_COLLECT:
> + rcs->state = RCU_STATE_DESTROY_AND_COLLECT;
> + rcu_move_and_raise(rcs, do_raise);
> + rcs->old = rcs->new;
> + rcs->oldtail = rcs->newtail;
> + rcs->oldqlen = rcs->newqlen;
> + rcs->new = NULL;
> + rcs->newtail = NULL;
> + rcs->newqlen = 0;
> + rcs->looking = 0;
> + if (rcu_cpumask_clear_and_test(&rgs->cpus, cpu))
> + inc_state = 1;
> + break;
> + case RCU_STATE_GRACE:
> + if (is_quiet || (rcs->quiet && rcs->looking)) {
> + rcs->state = RCU_STATE_GRACE;
> + if (rcu_cpumask_clear_and_test(&rgs->cpus, cpu))
> + inc_state = 1;
> + }
> + rcs->quiet = 0;
> + rcs->looking = 1;
> + break;
> + default:
> + BUG();
> }
> }
> -}
> -
> -#else /* #ifdef CONFIG_DEBUG_RCU_STALL */
>
> -static inline void record_gp_check_time(struct rcu_ctrlblk *rcp)
> -{
> -}
> -
> -static inline void
> -check_cpu_stall(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> -{
> -}
> -
> -#endif /* #else #ifdef CONFIG_DEBUG_RCU_STALL */
> -
> -/*
> - * Register a new batch of callbacks, and start it up if there is currently no
> - * active batch and the batch to be registered has not already occurred.
> - * Caller must hold rcu_ctrlblk.lock.
> - */
> -static void rcu_start_batch(struct rcu_ctrlblk *rcp)
> -{
> - if (rcp->cur != rcp->pending &&
> - rcp->completed == rcp->cur) {
> - rcp->cur++;
> - record_gp_check_time(rcp);
> + if (unlikely(inc_state)) {
> + local_irq_save(flags);
> + write_seqlock(&rgs->lock);
>
> + BUG_ON(rcu_cpumask_getstate(&rgs->cpus) != rcs->state);
> + BUG_ON(global_state != rcu_cpumask_getstate(&rgs->cpus));
> /*
> - * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
> - * Barrier Otherwise it can cause tickless idle CPUs to be
> - * included in rcp->cpumask, which will extend graceperiods
> - * unnecessarily.
> + * advance the state machine:
> + * - from COLLECT to GRACE
> + * - from GRACE to DESTROY/COLLECT
> */
> - smp_mb();
> - cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
> -
> - rcp->signaled = 0;
> + switch(rcu_cpumask_getstate(&rgs->cpus)) {
> + case RCU_STATE_DESTROY_AND_COLLECT:
> + rcu_cpumask_init(&rgs->cpus, RCU_STATE_GRACE, 1);
> + break;
> + case RCU_STATE_GRACE:
> + rgs->completed++;
> + if (rgs->start_immediately) {
> + rcu_cpumask_init(&rgs->cpus, RCU_STATE_DESTROY_AND_COLLECT, 1);
> + } else {
> + rcu_cpumask_init(&rgs->cpus, RCU_STATE_DESTROY, 0);
> + }
> + rgs->start_immediately = 0;
> + break;
> + default:
> + BUG();
> + }
> + write_sequnlock(&rgs->lock);
> + local_irq_restore(flags);
> }
> }
>
> -/*
> - * cpu went through a quiescent state since the beginning of the grace period.
> - * Clear it from the cpu mask and complete the grace period if it was the last
> - * cpu. Start another grace period if someone has further entries pending
> - */
> -static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
> +static void rcu_state_machine(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, int is_quiet, int cpu)
> {
> - cpu_clear(cpu, rcp->cpumask);
> - if (cpus_empty(rcp->cpumask)) {
> - /* batch completed ! */
> - rcp->completed = rcp->cur;
> - rcu_start_batch(rcp);
> - }
> -}
> + int global_state = rcu_cpumask_getstate(&rgs->cpus);
>
> -/*
> - * Check if the cpu has gone through a quiescent state (say context
> - * switch). If so and if it already hasn't done so in this RCU
> - * quiescent cycle, then indicate that it has done so.
> - */
> -static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
> - struct rcu_data *rdp)
> -{
> - unsigned long flags;
> + /* gcc should not optimize away the local variable global_state... */
> + barrier();
> + __rcu_state_machine(rgs, rcs, global_state, is_quiet, 1, cpu);
> +}
>
> - if (rdp->quiescbatch != rcp->cur) {
> - /* start new grace period: */
> - rdp->qs_pending = 1;
> - rdp->passed_quiesc = 0;
> - rdp->quiescbatch = rcp->cur;
> - return;
> - }
> +#if defined(CONFIG_HOTPLUG_CPU) || defined (CONFIG_NO_HZ)
>
> - /* Grace period already completed for this cpu?
> - * qs_pending is checked instead of the actual bitmap to avoid
> - * cacheline trashing.
> - */
> - if (!rdp->qs_pending)
> - return;
> +static void __rcu_remove_cpu(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, int cpu)
> +{
> + int global_state;
> + unsigned seq;
>
> - /*
> - * Was there a quiescent state since the beginning of the grace
> - * period? If no, then exit and wait for the next call.
> + BUG_ON(!irqs_disabled());
> + /* task 1:
> + * Do the work that the cpu is still supposed to do.
> + * We rely on the lock inside the rcu_cpumask, that guarantees that
> + * we neither do too much nor too little.
> + * But do not raise the softirq, the caller is responsible handling
> + * the entries stil in the queues.
> */
> - if (!rdp->passed_quiesc)
> - return;
> - rdp->qs_pending = 0;
> + global_state = rcu_cpumask_removecpu(&rgs->cpus, cpu);
>
> - spin_lock_irqsave(&rcp->lock, flags);
> /*
> - * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
> - * during cpu startup. Ignore the quiescent state.
> + * ensure that we are not in the middle of updating
> + * rcu_cpumask_getstate(&rgs->cpus): otherwise __rcu_state_machine()
> + * would return with "nothing to do", although
> + * the cpu must do something.
> */
> - if (likely(rdp->quiescbatch == rcp->cur))
> - cpu_quiet(rdp->cpu, rcp);
> + do {
> + seq = read_seqbegin(&rgs->lock);
> + } while (read_seqretry(&rgs->lock, seq));
>
> - spin_unlock_irqrestore(&rcp->lock, flags);
> + __rcu_state_machine(rgs, rcs, global_state, 1, 0, cpu);
> }
>
> +#endif
>
> #ifdef CONFIG_HOTPLUG_CPU
> -
> -/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
> - * locking requirements, the list it's pulling from has to belong to a cpu
> - * which is dead and hence not processing interrupts.
> +/**
> + * rcu_bulk_add - bulk add new rcu objects.
> + * @rgs: global rcu state
> + * @rcs: cpu state
> + * @h: linked list of rcu objects.
> + *
> + * Must be called with enabled local interrupts
> */
> -static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
> - struct rcu_head **tail, long batch)
> +static void rcu_bulk_add(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, struct rcu_head *h, struct rcu_head **htail, int len)
> {
> - if (list) {
> +
> + BUG_ON(irqs_disabled());
> +
> + if (len > 0) {
> local_irq_disable();
> - this_rdp->batch = batch;
> - *this_rdp->nxttail[2] = list;
> - this_rdp->nxttail[2] = tail;
> + if (rcs->new == NULL) {
> + rcs->new = h;
> + } else {
> + (*rcs->newtail) = h;
> + }
> + rcs->newtail = htail;
> +
> + rcu_checkqlen(rgs, rcs, len);
> local_irq_enable();
> }
> }
>
> -static void __rcu_offline_cpu(struct rcu_data *this_rdp,
> - struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> -{
> - unsigned long flags;
>
> +static void __rcu_offline_cpu(struct rcu_global_state *rgs, struct rcu_cpu_state *this_rcs,
> + struct rcu_cpu_state *other_rcs, int cpu)
> +{
> /*
> - * if the cpu going offline owns the grace period
> - * we can block indefinitely waiting for it, so flush
> - * it here
> + * task 1: Do the work that the other cpu is still supposed to do.
> */
> - spin_lock_irqsave(&rcp->lock, flags);
> - if (rcp->cur != rcp->completed)
> - cpu_quiet(rdp->cpu, rcp);
> - rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail, rcp->cur + 1);
> - rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail[2], rcp->cur + 1);
> - spin_unlock(&rcp->lock);
> -
> - this_rdp->qlen += rdp->qlen;
> - local_irq_restore(flags);
> + __rcu_remove_cpu(rgs, other_rcs, cpu);
> + per_cpu(rcu_cpumode, cpu) = RCU_CPUMODE_INVALID;
> +
> + /* task 2: move all entries from the new cpu into the lists of the current cpu.
> + * locking: The other cpu is dead, thus no locks are required.
> + * Thus it's more or less a bulk call_rcu().
> + * For the sake of simplicity, all objects are treated as "new", even the objects
> + * that are already in old.
> + */
> + rcu_bulk_add(rgs, this_rcs, other_rcs->new, other_rcs->newtail, other_rcs->newqlen);
> + rcu_bulk_add(rgs, this_rcs, other_rcs->old, other_rcs->oldtail, other_rcs->oldqlen);
> }
>
> static void rcu_offline_cpu(int cpu)
> {
> - struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
> - struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
> -
> - __rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
> - &per_cpu(rcu_data, cpu));
> - __rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
> - &per_cpu(rcu_bh_data, cpu));
> - put_cpu_var(rcu_data);
> - put_cpu_var(rcu_bh_data);
> -}
> + struct rcu_cpu_state *this_rcs_normal = &get_cpu_var(rcu_cpudata_normal);
> + struct rcu_cpu_state *this_rcs_bh = &get_cpu_var(rcu_cpudata_bh);
> + struct rcu_cpu_dead *this_rcd, *other_rcd;
>
> -#else
> + BUG_ON(irqs_disabled());
>
> -static void rcu_offline_cpu(int cpu)
> -{
> -}
> + /* step 1: move new & old lists, clear cpu bitmask */
> + __rcu_offline_cpu(&rcu_global_state_normal, this_rcs_normal,
> + &per_cpu(rcu_cpudata_normal, cpu), cpu);
> + __rcu_offline_cpu(&rcu_global_state_bh, this_rcs_bh,
> + &per_cpu(rcu_cpudata_bh, cpu), cpu);
> + put_cpu_var(rcu_cpudata_normal);
> + put_cpu_var(rcu_cpudata_bh);
>
> -#endif
> -
> -/*
> - * This does the RCU processing work from softirq context.
> - */
> -static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
> - struct rcu_data *rdp)
> -{
> - long completed_snap;
> + /* step 2: move dead list */
> + this_rcd = &get_cpu_var(rcu_cpudata_dead);
> + other_rcd = &per_cpu(rcu_cpudata_dead, cpu);
>
> - if (rdp->nxtlist) {
> + if (other_rcd->dead != NULL) {
> local_irq_disable();
> - completed_snap = ACCESS_ONCE(rcp->completed);
> -
> - /*
> - * move the other grace-period-completed entries to
> - * [rdp->nxtlist, *rdp->nxttail[0]) temporarily
> - */
> - if (!rcu_batch_before(completed_snap, rdp->batch))
> - rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2];
> - else if (!rcu_batch_before(completed_snap, rdp->batch - 1))
> - rdp->nxttail[0] = rdp->nxttail[1];
> -
> - /*
> - * the grace period for entries in
> - * [rdp->nxtlist, *rdp->nxttail[0]) has completed and
> - * move these entries to donelist
> - */
> - if (rdp->nxttail[0] != &rdp->nxtlist) {
> - *rdp->donetail = rdp->nxtlist;
> - rdp->donetail = rdp->nxttail[0];
> - rdp->nxtlist = *rdp->nxttail[0];
> - *rdp->donetail = NULL;
> -
> - if (rdp->nxttail[1] == rdp->nxttail[0])
> - rdp->nxttail[1] = &rdp->nxtlist;
> - if (rdp->nxttail[2] == rdp->nxttail[0])
> - rdp->nxttail[2] = &rdp->nxtlist;
> - rdp->nxttail[0] = &rdp->nxtlist;
> + if (this_rcd->dead == NULL) {
> + this_rcd->dead = other_rcd->dead;
> + } else {
> + (*this_rcd->deadtail) = other_rcd->dead;
> }
> -
> + this_rcd->deadtail = other_rcd->deadtail;
> + this_rcd->deadqlen += other_rcd->deadqlen;
> local_irq_enable();
> -
> - if (rcu_batch_after(rdp->batch, rcp->pending)) {
> - unsigned long flags;
> -
> - /* and start it/schedule start if it's a new batch */
> - spin_lock_irqsave(&rcp->lock, flags);
> - if (rcu_batch_after(rdp->batch, rcp->pending)) {
> - rcp->pending = rdp->batch;
> - rcu_start_batch(rcp);
> - }
> - spin_unlock_irqrestore(&rcp->lock, flags);
> - }
> }
>
> - rcu_check_quiescent_state(rcp, rdp);
> - if (rdp->donelist)
> - rcu_do_batch(rdp);
> + put_cpu_var(rcu_cpudata_dead);
> +
> + BUG_ON(rcu_needs_cpu(cpu));
> }
>
> -static void rcu_process_callbacks(struct softirq_action *unused)
> -{
> - /*
> - * Memory references from any prior RCU read-side critical sections
> - * executed by the interrupted code must be see before any RCU
> - * grace-period manupulations below.
> - */
> +#else
>
> - smp_mb(); /* See above block comment. */
> +static void rcu_offline_cpu(int cpu)
> +{
> +}
>
> - __rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
> - __rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> +#endif
>
> +static int __rcu_pending(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs)
> +{
> /*
> - * Memory references from any later RCU read-side critical sections
> - * executed by the interrupted code must be see after any RCU
> - * grace-period manupulations above.
> + * This cpu must do something for the state machine.
> */
> -
> - smp_mb(); /* See above block comment. */
> -}
> -
> -static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> -{
> - /* Check for CPU stalls, if enabled. */
> - check_cpu_stall(rcp, rdp);
> -
> - if (rdp->nxtlist) {
> - long completed_snap = ACCESS_ONCE(rcp->completed);
> -
> - /*
> - * This cpu has pending rcu entries and the grace period
> - * for them has completed.
> - */
> - if (!rcu_batch_before(completed_snap, rdp->batch))
> - return 1;
> - if (!rcu_batch_before(completed_snap, rdp->batch - 1) &&
> - rdp->nxttail[0] != rdp->nxttail[1])
> - return 1;
> - if (rdp->nxttail[0] != &rdp->nxtlist)
> - return 1;
> -
> - /*
> - * This cpu has pending rcu entries and the new batch
> - * for then hasn't been started nor scheduled start
> - */
> - if (rcu_batch_after(rdp->batch, rcp->pending))
> - return 1;
> - }
> -
> - /* This cpu has finished callbacks to invoke */
> - if (rdp->donelist)
> + if (rcu_cpumask_getstate(&rgs->cpus) != rcs->state)
> return 1;
> -
> - /* The rcu core waits for a quiescent state from the cpu */
> - if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
> + /*
> + * The state machine is stopped and the current
> + * cpu has outstanding rcu callbacks
> + */
> + if (rcs->state == RCU_STATE_DESTROY && rcs->newqlen)
> return 1;
>
> - /* nothing to do */
> return 0;
> }
>
> -/*
> +/**
> + * void rcu_pending(int cpu) - check for pending rcu related work.
> + * @cpu: cpu to check.
> + *
> * Check to see if there is any immediate RCU-related work to be done
> * by the current CPU, returning 1 if so. This function is part of the
> * RCU implementation; it is -not- an exported member of the RCU API.
> + *
> + * This function is inherently racy: If it returns 1, then there is something
> + * to do. If it return 0, then there was nothing to do. It's possible that
> + * by the time rcu_pending returns, there is now something to do.
> + *
> */
> int rcu_pending(int cpu)
> {
> - return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
> - __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
> + return __rcu_pending(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu)) ||
> + __rcu_pending(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu));
> }
>
> -/*
> +static int __rcu_needs_cpu(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs)
> +{
> + if (rcs->new)
> + return 1;
> + if (rcs->old)
> + return 1;
> + return 0;
> +}
> +
> +/**
> + * void rcu_needs_cpu(cpu) - check for outstanding rcu work.
> + * @cpu: cpu to check.
> + *
> * Check to see if any future RCU-related work will need to be done
> - * by the current CPU, even if none need be done immediately, returning
> + * by @cpu, even if none need be done immediately, returning
> * 1 if so. This function is part of the RCU implementation; it is -not-
> * an exported member of the RCU API.
> + *
> + * Locking only works properly if the function is called for the current
> + * cpu and with disabled local interupts. It's a prerequisite for
> + * rcu_nohz_enter() that rcu_needs_cpu() return 0. Local interupts must not
> + * be enabled in between, otherwise a softirq could call call_rcu().
> + *
> + * Note: rcu_needs_cpu() can be 0 (cpu not needed) even though rcu_pending()
> + * return 1. This means that the outstanding work can be completed by either
> + * the CPU_DEAD callback or rcu_enter_nohz().
> */
> int rcu_needs_cpu(int cpu)
> {
> - struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> - struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
> + int ret;
> + BUG_ON(!irqs_disabled());
> +
> + ret = __rcu_needs_cpu(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu)) ||
> + __rcu_needs_cpu(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu)) ||
> + (per_cpu(rcu_cpudata_dead, cpu).deadqlen > 0);
> +printk(KERN_ERR" rcu_needs cpu %d: %d.\n", cpu, ret);
>
> - return !!rdp->nxtlist || !!rdp_bh->nxtlist || rcu_pending(cpu);
> + return ret;
> }
>
> -/*
> +/**
> + * rcu_check_callback(cpu, user) - external entry point for grace checking
> + * @cpu: cpu id.
> + * @user: user space was interrupted.
> + *
> * Top-level function driving RCU grace-period detection, normally
> * invoked from the scheduler-clock interrupt. This function simply
> * increments counters that are read only from softirq by this same
> * CPU, so there are no memory barriers required.
> + *
> + * This function can run with disabled local interrupts, thus all
> + * callees must use local_irq_save()
> */
> void rcu_check_callbacks(int cpu, int user)
> {
> @@ -679,17 +607,9 @@ void rcu_check_callbacks(int cpu, int user)
> * nested interrupt. In this case, the CPU is in
> * a quiescent state, so count it.
> *
> - * Also do a memory barrier. This is needed to handle
> - * the case where writes from a preempt-disable section
> - * of code get reordered into schedule() by this CPU's
> - * write buffer. The memory barrier makes sure that
> - * the rcu_qsctr_inc() and rcu_bh_qsctr_inc() are see
> - * by other CPUs to happen after any such write.
> */
> -
> - smp_mb(); /* See above block comment. */
> - rcu_qsctr_inc(cpu);
> - rcu_bh_qsctr_inc(cpu);
> + rcu_state_machine(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu), 1, cpu);
> + rcu_state_machine(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu), 1, cpu);
>
> } else if (!in_softirq()) {
>
> @@ -697,39 +617,233 @@ void rcu_check_callbacks(int cpu, int user)
> * Get here if this CPU did not take its interrupt from
> * softirq, in other words, if it is not interrupting
> * a rcu_bh read-side critical section. This is an _bh
> - * critical section, so count it. The memory barrier
> - * is needed for the same reason as is the above one.
> + * critical section, so count it.
> + */
> + rcu_state_machine(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu), 0, cpu);
> + rcu_state_machine(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu), 1, cpu);
> + } else {
> + /*
> + * We are interrupting something. Nevertheless - check if we should collect
> + * rcu objects. This can be done from arbitrary context.
> */
> + rcu_state_machine(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu), 0, cpu);
> + rcu_state_machine(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu), 0, cpu);
> + }
> +}
> +
> +/*
> + * Invoke the completed RCU callbacks.
> + */
> +static void rcu_do_batch(struct rcu_cpu_dead *rcd)
> +{
> + struct rcu_head *list;
> + int i, count;
> +
> + if (!rcd->deadqlen)
> + return;
> +
> + /* step 1: pull up to rcs->batchcount objects */
> + BUG_ON(irqs_disabled());
> + local_irq_disable();
> +
> + if (rcd->deadqlen > rcd->batchcount) {
> + struct rcu_head *walk;
> +
> + list = rcd->dead;
> + count = rcd->batchcount;
> +
> + walk = rcd->dead;
> + for (i=0;i<count;i++)
> + walk = walk->next;
> + rcd->dead = walk;
> +
> + } else {
> + list = rcd->dead;
> + count = rcd->deadqlen;
> +
> + rcd->dead = NULL;
> + rcd->deadtail = NULL;
> + }
> + rcd->deadqlen -= count;
> + BUG_ON(rcd->deadqlen < 0);
> +
> + local_irq_enable();
> +
> + /* step 2: call the rcu callbacks */
> +
> + for (i=0;i<count;i++) {
> + struct rcu_head *next;
>
> - smp_mb(); /* See above block comment. */
> - rcu_bh_qsctr_inc(cpu);
> + next = list->next;
> + prefetch(next);
> + list->func(list);
> + list = next;
> }
> - raise_rcu_softirq();
> +
> + /* step 3: if still entries left, raise the softirq again */
> + if (rcd->deadqlen)
> + raise_softirq(RCU_SOFTIRQ);
> +}
> +
> +static void rcu_process_callbacks(struct softirq_action *unused)
> +{
> + rcu_do_batch(&get_cpu_var(rcu_cpudata_dead));
> + put_cpu_var(rcu_cpudata_dead);
> }
>
> -static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
> - struct rcu_data *rdp)
> +static void __rcu_add_cpu(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, int cpu)
> {
> - long flags;
> -
> - spin_lock_irqsave(&rcp->lock, flags);
> - memset(rdp, 0, sizeof(*rdp));
> - rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2] = &rdp->nxtlist;
> - rdp->donetail = &rdp->donelist;
> - rdp->quiescbatch = rcp->completed;
> - rdp->qs_pending = 0;
> - rdp->cpu = cpu;
> - rdp->blimit = blimit;
> - spin_unlock_irqrestore(&rcp->lock, flags);
> + rcs->state = rcu_cpumask_addcpu(&rgs->cpus, cpu);
> +}
> +
> +#ifdef CONFIG_NO_HZ
> +
> +void rcu_enter_nohz(void)
> +{
> + int cpu = smp_processor_id();
> + int *pmode;
> +
> + /*
> + * call_rcu() between rcu_needs_cpu and rcu_enter_nohz() are
> + * not permitted.
> + * Thus both must be called with disabled local interrupts,
> + * without enabling the interrupts in between.
> + *
> + * Note: disabling interrupts only prevents call_rcu().
> + * it can obviously happen that another cpu forwards
> + * the state machine. That doesn't hurt: __rcu_remove_cpu()
> + * the the work that we need to do.
> + */
> + BUG_ON(!irqs_disabled());
> +
> + pmode = &get_cpu_var(rcu_cpumode);
> + BUG_ON(*pmode != RCU_CPUMODE_DELAYED);
> + *pmode = 0;
> + put_cpu_var(rcu_cpumode);
> +
> + __rcu_remove_cpu(&rcu_global_state_normal, &get_cpu_var(rcu_cpudata_normal), cpu);
> + put_cpu_var(rcu_cpudata_normal);
> + __rcu_remove_cpu(&rcu_global_state_bh, &get_cpu_var(rcu_cpudata_bh), cpu);
> + put_cpu_var(rcu_cpudata_bh);
> +
> + BUG_ON(rcu_needs_cpu(cpu));
> +printk(KERN_ERR" enter_nohz %d.\n", cpu);
> +}
> +
> +void rcu_exit_nohz(void)
> +{
> + int cpu = smp_processor_id();
> + int *pmode;
> +
> + BUG_ON(!irqs_disabled());
> +
> + pmode = &get_cpu_var(rcu_cpumode);
> + BUG_ON(*pmode != 0);
> + *pmode = RCU_CPUMODE_DELAYED;
> + put_cpu_var(rcu_cpumode);
> +
> + __rcu_add_cpu(&rcu_global_state_normal, &get_cpu_var(rcu_cpudata_normal), cpu);
> + put_cpu_var(rcu_cpudata_normal);
> + __rcu_add_cpu(&rcu_global_state_bh, &get_cpu_var(rcu_cpudata_bh), cpu);
> + put_cpu_var(rcu_cpudata_bh);
> +
> +printk(KERN_ERR" exit_nohz %d.\n", cpu);
> +}
> +
> +void rcu_irq_enter(void)
> +{
> + int *pmode;
> +
> + BUG_ON(!irqs_disabled());
> +
> + pmode = &get_cpu_var(rcu_cpumode);
> + if (unlikely(*pmode != RCU_CPUMODE_DELAYED)) {
> +printk(KERN_ERR" irq enter %d, %d.\n", smp_processor_id(), *pmode);
> + /* FIXME:
> + * This code is not NMI safe. especially:
> + * __rcu_add_cpu acquires spinlocks.
> + */
> + if (*pmode == 0) {
> + int cpu = smp_processor_id();
> +
> + __rcu_add_cpu(&rcu_global_state_normal,&get_cpu_var(rcu_cpudata_normal), cpu);
> + put_cpu_var(rcu_cpudata_normal);
> + __rcu_add_cpu(&rcu_global_state_bh,&get_cpu_var(rcu_cpudata_bh), cpu);
> + put_cpu_var(rcu_cpudata_bh);
> + }
> + (*pmode)++;
> + }
> + put_cpu_var(rcu_cpumode);
> +}
> +
> +void rcu_irq_exit(void)
> +{
> + int *pmode;
> +
> + BUG_ON(!irqs_disabled());
> +
> + pmode = &get_cpu_var(rcu_cpumode);
> + if (unlikely(*pmode != RCU_CPUMODE_DELAYED)) {
> +
> +printk(KERN_ERR" irq exit %d, %d.\n", smp_processor_id(), *pmode);
> + (*pmode)--;
> +
> + if (*pmode == 0) {
> + int cpu = smp_processor_id();
> + /* FIXME:
> + * This code is not NMI safe. especially:
> + * __rcu_remove_cpu acquires spinlocks.
> + */
> +
> + /*
> + * task 1: remove us from the list of cpus that might be inside critical
> + * sections and inform the global state machine that we are outside
> + * any read side critical sections.
> + */
> + __rcu_remove_cpu(&rcu_global_state_normal,&per_cpu(rcu_cpudata_normal, cpu), cpu);
> + __rcu_remove_cpu(&rcu_global_state_bh,&per_cpu(rcu_cpudata_bh, cpu), cpu);
> +
> + if (rcu_needs_cpu(cpu)) {
> + /*
> + * task 2: Someone did a call_rcu() in the interupt.
> + * Duh, we've lost. Force a reschedule, that leaves nohz mode.
> + * FIXME: double check that this really works.
> + *
> + * Note: This can race: our call_rcu() might have set
> + * start_immediately. But: that start might happen before
> + * we readd ourself to the global cpu mask. Then we would
> + * not take part in the global cycle - and we would not set
> + * start_immediately again, either. The timeout would
> + * ensure forward progress, thus it's not that bad.
> + */
> + printk(KERN_ERR" irq exit %d - need resched .\n", cpu);
> + set_need_resched();
> + }
> + }
> + }
> +}
> +
> +#endif /* CONFIG_NO_HZ */
> +
> +static void rcu_init_percpu_data(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, int cpu)
> +{
> + __rcu_add_cpu(rgs, rcs, cpu);
> +
> + rcs->new = rcs->old = NULL;
> + rcs->newqlen = rcs->oldqlen = 0;
> }
>
> static void __cpuinit rcu_online_cpu(int cpu)
> {
> - struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> - struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
> + rcu_init_percpu_data(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu), cpu);
> + rcu_init_percpu_data(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu), cpu);
> +
> + per_cpu(rcu_cpumode, cpu) = RCU_CPUMODE_DELAYED;
> +
> + per_cpu(rcu_cpudata_dead, cpu).dead = NULL;
> + per_cpu(rcu_cpudata_dead, cpu).deadqlen = 0;
> + per_cpu(rcu_cpudata_dead, cpu).batchcount = RCU_BATCH_MIN;
>
> - rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
> - rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
> open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
> }
>
> @@ -743,6 +857,15 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
> case CPU_UP_PREPARE_FROZEN:
> rcu_online_cpu(cpu);
> break;
> + case CPU_UP_CANCELED:
> + case CPU_UP_CANCELED_FROZEN:
> + /*
> + * During CPU_UP_PREPARE, the cpu is fully accounted for
> + * and added into the rcu_cpumask. Thus it must be properly
> + * removed if the CPU_UP failed.
> + * Therefore CPU_UP_CANCELED is equivalent to CPU_DEAD.
> + */
> + /* fall-through */
> case CPU_DEAD:
> case CPU_DEAD_FROZEN:
> rcu_offline_cpu(cpu);
> @@ -765,12 +888,12 @@ static struct notifier_block __cpuinitdata rcu_nb = {
> */
> void __init __rcu_init(void)
> {
> + rcu_cpumask_init(&rcu_global_state_normal.cpus, RCU_STATE_DESTROY, 0);
> + rcu_cpumask_init(&rcu_global_state_bh.cpus, RCU_STATE_DESTROY, 0);
> rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
> (void *)(long)smp_processor_id());
> /* Register notifier for non-boot CPUs */
> register_cpu_notifier(&rcu_nb);
> }
>
> -module_param(blimit, int, 0);
> -module_param(qhimark, int, 0);
> module_param(qlowmark, int, 0);
> diff --git a/kernel/rcucpumask.c b/kernel/rcucpumask.c
> new file mode 100644
> index 0000000..85ceb1e
> --- /dev/null
> +++ b/kernel/rcucpumask.c
> @@ -0,0 +1,119 @@
> +/*
> + * Scalable cpu mask for rcu.
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> + *
> + * (C) Manfred Spraul <manfred@colorfullife.com>, 2008
> + *
> + */
> +#include <linux/rcucpumask.h>
> +#include <linux/bug.h>
> +
> +#ifdef RCUCPUMASK_FLAT
> +
> +void rcu_cpumask_init(struct rcu_cpumask *rcm, int newstate, int setupcpus)
> +{
> + BUG_ON(!irqs_disabled());
> +
> + spin_lock(&rcm->lock);
> + rcm->state = newstate;
> +
> + if (setupcpus) {
> + rcm->cpus_open = rcm->cpus_total;
> +
> + bitmap_copy(cpus_addr(rcm->mask_cpu_open), cpus_addr(rcm->mask_cpu_total), NR_CPUS);
> + } else {
> + rcm->cpus_open = 0;
> + cpus_clear(rcm->mask_cpu_open);
> + }
> + spin_unlock(&rcm->lock);
> +}
> +
> +int rcu_cpumask_clear_and_test(struct rcu_cpumask *rcm, int cpu)
> +{
> + int ret;
> +
> + BUG_ON(!irqs_disabled());
> +
> + spin_lock(&rcm->lock);
> +
> + BUG_ON(!cpu_isset(cpu, rcm->mask_cpu_open