package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.event.task.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriter.class */
public class RecordWriter<T extends IOReadableWritable> {
    protected final ResultPartitionWriter writer;
    private final ChannelSelector<T> channelSelector;
    private final int numChannels;
    private final RecordSerializer<T>[] serializers;

    public RecordWriter(ResultPartitionWriter resultPartitionWriter) {
        this(resultPartitionWriter, new RoundRobinChannelSelector());
    }

    public RecordWriter(ResultPartitionWriter resultPartitionWriter, ChannelSelector<T> channelSelector) {
        this.writer = resultPartitionWriter;
        this.channelSelector = channelSelector;
        this.numChannels = resultPartitionWriter.getNumberOfOutputChannels();
        this.serializers = new SpanningRecordSerializer[this.numChannels];
        for (int i = 0; i < this.numChannels; i++) {
            this.serializers[i] = new SpanningRecordSerializer();
        }
    }

    public void emit(T t) throws IOException, InterruptedException {
        for (int i : this.channelSelector.selectChannels(t, this.numChannels)) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            synchronized (recordSerializer) {
                RecordSerializer.SerializationResult addRecord = recordSerializer.addRecord(t);
                while (addRecord.isFullBuffer()) {
                    Buffer currentBuffer = recordSerializer.getCurrentBuffer();
                    if (currentBuffer != null) {
                        writeBuffer(currentBuffer, i, recordSerializer);
                    }
                    addRecord = recordSerializer.setNextBuffer(this.writer.getBufferProvider().requestBufferBlocking());
                }
            }
        }
    }

    public void broadcastEvent(AbstractEvent abstractEvent) throws IOException, InterruptedException {
        for (int i = 0; i < this.numChannels; i++) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            synchronized (recordSerializer) {
                if (recordSerializer.hasData()) {
                    Buffer currentBuffer = recordSerializer.getCurrentBuffer();
                    if (currentBuffer == null) {
                        throw new IllegalStateException("Serializer has data but no buffer.");
                    }
                    writeBuffer(currentBuffer, i, recordSerializer);
                    this.writer.writeEvent(abstractEvent, i);
                    recordSerializer.setNextBuffer(this.writer.getBufferProvider().requestBufferBlocking());
                } else {
                    this.writer.writeEvent(abstractEvent, i);
                }
            }
        }
    }

    public void sendEndOfSuperstep() throws IOException, InterruptedException {
        for (int i = 0; i < this.numChannels; i++) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            synchronized (recordSerializer) {
                Buffer currentBuffer = recordSerializer.getCurrentBuffer();
                if (currentBuffer != null) {
                    writeBuffer(currentBuffer, i, recordSerializer);
                    recordSerializer.setNextBuffer(this.writer.getBufferProvider().requestBufferBlocking());
                }
            }
        }
        this.writer.writeEndOfSuperstep();
    }

    public void flush() throws IOException {
        for (int i = 0; i < this.numChannels; i++) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            synchronized (recordSerializer) {
                try {
                    Buffer currentBuffer = recordSerializer.getCurrentBuffer();
                    if (currentBuffer != null) {
                        writeBuffer(currentBuffer, i, recordSerializer);
                    }
                    recordSerializer.clear();
                } finally {
                }
            }
        }
    }

    public void clearBuffers() {
        RecordSerializer<T>[] recordSerializerArr = this.serializers;
        int length = recordSerializerArr.length;
        for (int i = 0; i < length; i++) {
            RecordSerializer<T> recordSerializer = recordSerializerArr[i];
            synchronized (recordSerializer) {
                try {
                    Buffer currentBuffer = recordSerializer.getCurrentBuffer();
                    if (currentBuffer != null) {
                        currentBuffer.recycle();
                    }
                    recordSerializer.clear();
                } finally {
                }
            }
        }
    }

    private void writeBuffer(Buffer buffer, int i, RecordSerializer<T> recordSerializer) throws IOException {
        try {
            this.writer.writeBuffer(buffer, i);
            recordSerializer.clearCurrentBuffer();
        } catch (Throwable th) {
            recordSerializer.clearCurrentBuffer();
            throw th;
        }
    }
}
