/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.mq.server;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.JMSException;
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.ReceiveRequest;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.Subscription;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.server.JMSDestination;
import org.jboss.mq.server.JMSDestinationManager;
import org.jboss.mq.server.JMSTopic;
import org.jboss.mq.server.RoutedMessage;
import org.jboss.mq.threadpool.ThreadPool;
import org.jboss.mq.threadpool.Work;

public class ClientConsumer
implements Work {
    private Logger log;
    JMSDestinationManager server;
    ConnectionToken connectionToken;
    boolean enabled;
    boolean closed = false;
    HashMap subscriptions = new HashMap();
    HashMap removedSubscriptions = new HashMap();
    LinkedList blockedSubscriptions = new LinkedList();
    private LinkedList messages = new LinkedList();
    private boolean enqueued = false;
    private static ThreadPool threadPool = null;
    static /* synthetic */ Class class$org$jboss$mq$server$ClientConsumer;

    public ClientConsumer(JMSDestinationManager server, ConnectionToken connectionToken) throws JMSException {
        this.server = server;
        this.connectionToken = connectionToken;
        this.log = Logger.getLogger((String)((class$org$jboss$mq$server$ClientConsumer == null ? (class$org$jboss$mq$server$ClientConsumer = ClientConsumer.class$("org.jboss.mq.server.ClientConsumer")) : class$org$jboss$mq$server$ClientConsumer).getName() + ":" + connectionToken.getClientID()));
        Class clazz = class$org$jboss$mq$server$ClientConsumer == null ? (class$org$jboss$mq$server$ClientConsumer = ClientConsumer.class$("org.jboss.mq.server.ClientConsumer")) : class$org$jboss$mq$server$ClientConsumer;
        synchronized (clazz) {
            if (threadPool == null) {
                threadPool = new ThreadPool("Message Pushers", server.threadGroup, 10, true);
            }
        }
    }

    public void setEnabled(boolean enabled) throws JMSException {
        if (this.log.isTraceEnabled()) {
            this.log.trace((Object)("" + this + "->setEnabled(enabled=" + enabled + ")"));
        }
        this.enabled = enabled;
        if (enabled) {
            LinkedList linkedList = this.blockedSubscriptions;
            synchronized (linkedList) {
                Iterator it = this.blockedSubscriptions.iterator();
                while (it.hasNext()) {
                    Subscription sub = (Subscription)it.next();
                    JMSDestination dest = this.server.getJMSDestination(sub.destination);
                    if (dest == null) continue;
                    dest.addReceiver(sub);
                }
                this.blockedSubscriptions.clear();
            }
        }
    }

    public void queueMessageForSending(RoutedMessage r) {
        LinkedList linkedList = this.messages;
        synchronized (linkedList) {
            if (this.closed) {
                return;
            }
            this.messages.add(r);
            if (!this.enqueued) {
                threadPool.enqueueWork(this);
                this.enqueued = true;
            }
        }
    }

    public void addSubscription(Subscription req) throws JMSException {
        if (this.log.isTraceEnabled()) {
            this.log.trace((Object)("Adding subscription for: " + req));
        }
        req.connectionToken = this.connectionToken;
        req.clientConsumer = this;
        JMSDestination jmsdest = this.server.getJMSDestination(req.destination);
        if (jmsdest == null) {
            throw new JMSException("The destination " + req.destination + " does not exist !");
        }
        jmsdest.addSubscriber(req);
        HashMap hashMap = this.subscriptions;
        synchronized (hashMap) {
            HashMap subscriptionsClone = (HashMap)this.subscriptions.clone();
            subscriptionsClone.put(new Integer(req.subscriptionId), req);
            this.subscriptions = subscriptionsClone;
        }
    }

    public void close() {
        boolean trace = this.log.isTraceEnabled();
        if (trace) {
            this.log.trace((Object)("" + this + "->close()"));
        }
        LinkedList linkedList = this.messages;
        synchronized (linkedList) {
            this.closed = true;
            if (this.enqueued) {
                if (trace) {
                    this.log.trace((Object)("" + this + "->close(): Cancelling work in progress."));
                }
                threadPool.cancelWork(this);
                this.enqueued = false;
            }
        }
        HashMap hashMap = this.subscriptions;
        synchronized (hashMap) {
            Iterator i = this.subscriptions.keySet().iterator();
            while (i.hasNext()) {
                Integer subscriptionId = (Integer)i.next();
                try {
                    this.removeSubscription(subscriptionId);
                }
                catch (JMSException ignore) {
                    // empty catch block
                }
            }
        }
        HashMap removedSubsClone = (HashMap)this.removedSubscriptions.clone();
        Iterator i = removedSubsClone.values().iterator();
        while (i.hasNext()) {
            Subscription removed = (Subscription)i.next();
            JMSDestination queue = this.server.getJMSDestination(removed.destination);
            if (queue == null) {
                this.log.warn((Object)("The subscription was registered with a destination that does not exist: " + removed));
            }
            try {
                queue.nackMessages(removed);
            }
            catch (JMSException e) {
                this.log.warn((Object)("Unable to nack removed subscription: " + removed), (Throwable)e);
            }
        }
    }

    public SpyMessage receive(int subscriberId, long wait) throws JMSException {
        Subscription req = (Subscription)this.subscriptions.get(new Integer(subscriberId));
        if (req == null) {
            throw new JMSException("The provided subscription does not exist");
        }
        JMSDestination queue = this.server.getJMSDestination(req.destination);
        if (queue == null) {
            throw new JMSException("The subscription's destination " + req.destination + " does not exist");
        }
        if (this.enabled) {
            return queue.receive(req, wait != -1L);
        }
        if (wait != -1L) {
            this.addBlockedSubscription(req);
        }
        return null;
    }

    public void removeSubscription(int subscriptionId) throws JMSException {
        Subscription req;
        if (this.log.isTraceEnabled()) {
            this.log.trace((Object)("" + this + "->removeSubscription(subscriberId=" + subscriptionId + ")"));
        }
        Integer subId = new Integer(subscriptionId);
        HashMap hashMap = this.subscriptions;
        synchronized (hashMap) {
            HashMap subscriptionsClone = (HashMap)this.subscriptions.clone();
            req = (Subscription)subscriptionsClone.remove(subId);
            this.subscriptions = subscriptionsClone;
            if (req != null) {
                this.removedSubscriptions.put(subId, req);
            }
        }
        if (req == null) {
            throw new JMSException("The subscription had not been previously registered");
        }
        JMSDestination queue = this.server.getJMSDestination(req.destination);
        if (queue == null) {
            throw new JMSException("The subscription was registed with a destination that does not exist !");
        }
        queue.removeSubscriber(req);
    }

    public void doWork() {
        try {
            ReceiveRequest[] job;
            LinkedList linkedList = this.messages;
            synchronized (linkedList) {
                if (this.closed) {
                    return;
                }
                job = new ReceiveRequest[this.messages.size()];
                Iterator iter = this.messages.iterator();
                int i = 0;
                while (iter.hasNext()) {
                    RoutedMessage rm = (RoutedMessage)iter.next();
                    job[i] = rm.toReceiveRequest();
                    iter.remove();
                    ++i;
                }
                this.enqueued = false;
            }
            this.connectionToken.clientIL.receive(job);
        }
        catch (Exception e) {
            this.log.warn((Object)"Could not send messages to a receiver.", (Throwable)e);
            try {
                this.server.connectionFailure(this.connectionToken);
            }
            catch (Throwable ignore) {
                this.log.warn((Object)"Could not close the client connection..", ignore);
            }
        }
    }

    public String toString() {
        return "ClientConsumer:" + this.connectionToken.getClientID();
    }

    public void acknowledge(AcknowledgementRequest request, Tx txId) throws JMSException {
        Subscription sub = (Subscription)this.subscriptions.get(new Integer(request.subscriberId));
        if (sub == null) {
            HashMap hashMap = this.subscriptions;
            synchronized (hashMap) {
                sub = (Subscription)this.removedSubscriptions.get(new Integer(request.subscriberId));
            }
        }
        if (sub == null) {
            throw new JMSException("The provided subscription does not exist");
        }
        JMSDestination queue = this.server.getJMSDestination(sub.destination);
        if (queue == null) {
            throw new JMSException("The subscription's destination " + sub.destination + " does not exist");
        }
        queue.acknowledge(request, sub, txId);
    }

    void addBlockedSubscription(Subscription sub) {
        LinkedList linkedList = this.blockedSubscriptions;
        synchronized (linkedList) {
            this.blockedSubscriptions.add(sub);
        }
    }

    void removeRemovedSubscription(int subId) {
        JMSDestination topic;
        Subscription sub = null;
        HashMap hashMap = this.subscriptions;
        synchronized (hashMap) {
            sub = (Subscription)this.removedSubscriptions.remove(new Integer(subId));
        }
        if (sub != null && (topic = this.server.getJMSDestination(sub.destination)) instanceof JMSTopic) {
            ((JMSTopic)topic).cleanupSubscription(sub);
        }
    }

    public Subscription getSubscription(int subscriberId) throws JMSException {
        Subscription req = (Subscription)this.subscriptions.get(new Integer(subscriberId));
        if (req == null) {
            throw new JMSException("The provided subscription does not exist");
        }
        return req;
    }

    static /* synthetic */ Class class$(String x0) {
        try {
            return Class.forName(x0);
        }
        catch (ClassNotFoundException x1) {
            throw new NoClassDefFoundError(x1.getMessage());
        }
    }
}

