Java分布式学习入门教程
2024/9/25 23:02:54
本文主要是介绍Java分布式学习入门教程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Java分布式学习入门涵盖了分布式系统的基本概念、Java在网络环境中的应用、分布式系统的优势与挑战以及网络编程和数据一致性等内容。文章详细介绍了Java RMI、MapReduce、Apache Spark等关键技术,并探讨了分布式存储系统和消息队列的应用。通过丰富的示例代码,读者可以深入理解Java在分布式环境中的开发和实际应用。本文旨在为初学者提供全面的Java分布式系统入门指南。
分布式系统的基本概念
分布式系统是指一组通过网络相互连接的计算机系统,它们协同工作以共同完成一个或多个相关的任务。这些系统中的每个计算机都称为节点,它们通过网络协议进行通信和协调。分布式系统的目的是提高系统的可用性、可靠性、可扩展性和性能,同时降低单点故障的风险。
Java在分布式系统中的应用
Java是一种广泛使用的编程语言,它在分布式系统中有着重要的应用。Java平台提供了丰富的API和框架,支持在网络环境中进行各种分布式应用开发。例如,Java RMI(Remote Method Invocation)允许程序通过网络调用远程对象的方法,Java EE(Enterprise Edition)提供了诸如EJB(Enterprise JavaBeans)、JMS(Java Message Service)等组件,用于构建分布式企业应用。
分布式系统的优势与挑战
优势:
- 可扩展性:分布式系统可以通过增加节点轻松扩展系统容量。
- 可靠性:即使部分节点失效,整个系统仍可以正常运行。
- 性能改进:通过任务的并行处理,可以显著提高性能。
- 资源利用:更有效地利用资源,特别是在大规模系统中。
挑战:
- 复杂性:系统的复杂性随着节点数量的增加而增加。
- 一致性:保持数据的一致性是一个重要的挑战。
- 通信延迟:网络延迟可能会影响系统的响应时间。
- 安全性:分布式系统更容易受到安全攻击。
Java网络编程入门
Java提供了丰富的网络编程API,包括java.net
包中的Socket
和ServerSocket
类,以及java.nio
包中的非阻塞I/O支持,这些API可以用来构建基本的网络应用。
使用Socket进行基本的网络通信
Socket编程是Java网络编程的基础,它允许两个程序通过网络进行通信。下面是一个简单的示例,展示如何使用Socket建立客户端和服务器端的通信。
服务器端代码:
import java.io.*; import java.net.*; public class SimpleServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(9090); System.out.println("Server started on port 9090"); while (true) { Socket clientSocket = serverSocket.accept(); System.out.println("New client connected"); newClientHandler(clientSocket); } } private static void newClientHandler(Socket clientSocket) { new Thread(() -> { try (BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) { String inputLine; while ((inputLine = in.readLine()) != null) { System.out.println("Received: " + inputLine); out.println("Echo: " + inputLine); } } catch (IOException e) { e.printStackTrace(); } }).start(); } }
客户端代码:
import java.io.*; import java.net.*; public class SimpleClient { public static void main(String[] args) throws IOException { Socket socket = new Socket("localhost", 9090); PrintWriter out = new PrintWriter(socket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); String userInput; BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in)); String serverResponse; while ((userInput = stdIn.readLine()) != null) { out.println(userInput); serverResponse = in.readLine(); System.out.println("Server Echo: " + serverResponse); } socket.close(); } }
使用java.nio实现非阻塞I/O
下面是一个使用java.nio
包实现非阻塞I/O的简单示例。
服务器端代码:
import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NioServer { public static void main(String[] args) throws Exception { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9090)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey selectionKey = selectedKeys.next(); if (selectionKey.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = socketChannel.read(buffer); if (read == -1) { socketChannel.close(); } else { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String receivedMessage = new String(bytes); System.out.println("Received: " + receivedMessage); ByteBuffer responseBuffer = ByteBuffer.wrap("Echo: " + receivedMessage).array(); socketChannel.write(ByteBuffer.wrap(responseBuffer)); } } selectedKeys.remove(); } } } }
客户端代码:
import java.io.*; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Scanner; public class NioClient { public static void main(String[] args) throws Exception { Selector selector = Selector.open(); SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); Scanner scanner = new Scanner(System.in); while (true) { selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey selectionKey = selectedKeys.next(); if (selectionKey.isReadable()) { SocketChannel socketChannel1 = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = socketChannel1.read(buffer); if (read == -1) { socketChannel.close(); break; } buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String receivedMessage = new String(bytes); System.out.println("Server Echo: " + receivedMessage); } else if (selectionKey.isWritable()) { String userInput = scanner.nextLine(); ByteBuffer buffer = ByteBuffer.wrap(userInput.getBytes()); socketChannel.write(buffer); socketChannel.register(selector, SelectionKey.OP_READ); } selectedKeys.remove(); } } } }
了解Java RMI(远程方法调用)
Java RMI(Remote Method Invocation)允许程序通过网络调用远程对象的方法。RMI是Java提供的一种分布计算模型,它支持在不同的Java虚拟机之间进行对象的远程调用。
RMI的基本概念:
- 远程对象:通过RMI在网络中公开的方法的对象。
- Stub和Skeleton:当客户端请求调用远程对象的方法时,RMI会创建一个Stub对象来代理远程对象的调用,并创建一个Skeleton对象来处理网络通信。
- Registry:RMI注册表用于存储远程对象的引用,客户端可以通过这个注册表来查找和调用远程对象的方法。
RMI的简单示例:
服务端代码:
import java.rmi.Remote; import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; public interface MyRemote extends Remote { String sayHello() throws RemoteException; } public class MyRemoteImpl implements MyRemote { @Override public String sayHello() throws RemoteException { return "Hello from RMI server!"; } } public class RmiServer { public static void main(String[] args) { try { MyRemoteImpl myRemoteImpl = new MyRemoteImpl(); Registry registry = LocateRegistry.createRegistry(1099); registry.rebind("MyRemote", myRemoteImpl); System.out.println("RMI server is running..."); } catch (Exception e) { e.printStackTrace(); } } }
客户端代码:
import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; public class RmiClient { public static void main(String[] args) { try { Registry registry = LocateRegistry.getRegistry(1099); MyRemote myRemote = (MyRemote) registry.lookup("MyRemote"); String hello = myRemote.sayHello(); System.out.println(hello); } catch (Exception e) { e.printStackTrace(); } } }
数据一致性
实现数据一致性的策略与工具介绍
实现数据一致性的策略包括:
- 两阶段提交:这是处理分布式事务的一种方法。
- 乐观锁:通过版本号来检查数据是否被其他事务修改。
- 悲观锁:通过锁定资源来防止数据被修改。
常用的工具包括:
- ZooKeeper:一个分布式协调服务,可以用于实现分布式锁。
- Etcd:一个可靠的分布式键值存储,常用于实现分布式锁和服务发现。
示例代码(使用ZooKeeper实现分布式锁)
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class ZooKeeperDistributedLock { private static final String ZK_ADDRESS = "localhost:2181"; private static final String ZK_LOCK_PATH = "/distributedlock"; private ZooKeeper zooKeeper; public ZooKeeperDistributedLock() throws IOException { zooKeeper = new ZooKeeper(ZK_ADDRESS, 5000, event -> { if (ZooKeeper.States.SyncConnected.equals(event.getState())) { ZooKeeperDistributedLock.this.connectLatch.countDown(); } }); connectLatch = new CountDownLatch(1); try { connectLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } private CountDownLatch connectLatch; public boolean lock() throws InterruptedException, KeeperException { String path = zooKeeper.create(ZK_LOCK_PATH + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); Stat stat = null; while (true) { String[] children = zooKeeper.getChildren(ZK_LOCK_PATH, null); String currentLockName = new String(path).substring(ZK_LOCK_PATH.length() + 1); int currentLockIndex = Integer.parseInt(currentLockName); for (String child : children) { String lockName = new String(child).substring(ZK_LOCK_PATH.length() + 1); int lockIndex = Integer.parseInt(lockName); if (currentLockIndex > lockIndex) { path = zooKeeper.create(ZK_LOCK_PATH + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } else if (lockIndex == currentLockIndex + 1) { stat = zooKeeper.exists(ZK_LOCK_PATH + "/" + lockName, null); if (stat == null) { return true; } } } Thread.sleep(1000); } } public void unlock() throws InterruptedException, KeeperException { zooKeeper.delete(path, -1); } }
分布式计算框架
MapReduce原理与应用
MapReduce是一种编程模型,用于大规模数据集的并行处理。它将复杂的任务分解成多个更小的任务,这些任务可以并行执行。MapReduce框架可以运行在各种分布式计算环境中,如Hadoop和Spark。
MapReduce流程:
- Map阶段:将输入数据分成多个块,每个块由一个Mapper处理。
- Shuffle阶段:Mapper将结果分组,并将它们发送到Reducer。
- Reduce阶段:Reducer处理所有的分组数据,并最终输出结果。
示例代码(使用Hadoop MapReduce):
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\\s+"); for (String w : words) { word.set(w); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
示例代码(使用其他输入输出格式)
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; public class SequenceFileWordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\\s+"); for (String w : words) { word.set(w); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sequence file word count"); job.setJarByClass(SequenceFileWordCount.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Apache Spark简介及其简单使用
Apache Spark是一个快速、通用的分布式计算系统,它支持大规模数据集的处理。Spark提供了丰富的API,支持多种编程语言如Java、Scala等,可以用于构建各种数据处理应用。
Spark的基本概念:
- RDD(Resilient Distributed Dataset):是Spark的核心抽象,它是不可变的、分区的、容错的数据集。
- Transformation:转换操作,如map、filter等,它们会生成新的RDD。
- Action:动作操作,如reduce、collect等,它们会触发计算并返回结果。
示例代码(使用Java API):
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import java.util.Arrays; public class SimpleSpark { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Simple App").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("input.txt"); JavaRDD<String> words = lines.flatMap((Function<String, Iterable<String>>) line -> Arrays.asList(line.split("\\s+")).iterator()); JavaRDD<Integer> lengths = words.map(String::length); int totalLength = lengths.reduce((a, b) -> a + b); System.out.println(totalLength); } }
示例代码(使用其他输入输出格式)
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import scala.Tuple2; import java.util.Arrays; public class SparkWordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Spark Word Count").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<String, Integer> counts = sc.textFile("input.txt") .flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split("\\s+")).iterator()) .mapToPair((Function<String, Tuple2<String, Integer>>) word -> new Tuple2<>(word, 1)) .reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b); counts.saveAsTextFile("output"); } }
分布式存储系统
分布式文件系统(如HDFS)
分布式文件系统(DFS)是一种文件系统,它将数据分布在多个节点上,以提高系统的容错性和可扩展性。Hadoop Distributed File System(HDFS)是Hadoop生态系统中的分布式文件系统,它提供了高吞吐量的数据访问,适用于大规模数据集的分布式存储和处理。
HDFS的基本概念:
- NameNode:负责管理文件系统的命名空间和客户端对文件的访问。
- DataNode:存储实际的数据块。
HDFS的简单使用示例:
# 启动NameNode和DataNode hdfs namenode -format hdfs namenode & hdfs datanode & # 创建HDFS目录 hadoop fs -mkdir /user hadoop fs -mkdir /user/username # 从本地文件上传到HDFS hadoop fs -put localfile /user/username/ # 从HDFS下载文件到本地 hadoop fs -get /user/username/localfile localfile
示例代码(使用Java API访问HDFS)
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; public class HdfsExample { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); // 从HDFS读取文件 Path hdfsPath = new Path("/user/username/localfile"); FileInputStream fis = new FileInputStream(hdfsPath.toString()); FileOutputStream fos = new FileOutputStream("localfile"); IOUtils.copyBytes(fis, fos, 1024, false); // 将文件上传到HDFS Path localPath = new Path("localfile"); fs.copyFromLocalFile(false, localPath, hdfsPath); } }
分布式数据库(如Cassandra、HBase)
分布式数据库是分布式系统中用于存储和管理数据的数据库系统。常见的分布式数据库包括Cassandra和HBase。
Cassandra的优点:
- 高可用性:没有单点故障。
- 弹性伸缩:可以水平扩展。
- 数据分区:数据可以分布在多个节点上。
HBase的优点:
- 列存储:适用于稀疏数据。
- 实时读写:支持实时读写操作。
- 强大的扩展性:支持水平扩展。
示例代码(使用Java API访问HBase)
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.Properties; public class HBaseExample { public static void main(String[] args) throws IOException { Properties props = HBaseConfiguration.create(); props.setProperty("hbase.zookeeper.quorum", "localhost"); props.setProperty("hbase.zookeeper.property.clientPort", "2181"); Connection connection = ConnectionFactory.createConnection(props); Table table = connection.getTable(TableName.valueOf("mytable")); Put put = new Put(Bytes.toBytes("row1")); put.addColumn(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), Bytes.toBytes("value")); table.put(put); } }
示例代码(使用Java API访问Cassandra)
import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Session; import java.util.UUID; public class CassandraExample { public static void main(String[] args) { Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); Session session = cluster.connect("mykeyspace"); session.execute("CREATE TABLE IF NOT EXISTS users (id UUID PRIMARY KEY, name text, age int)"); session.execute("INSERT INTO users (id, name, age) VALUES (?, ?, ?)", UUID.randomUUID(), "John Doe", 30); String cql = "SELECT * FROM users"; for (com.datastax.driver.core.Row row : session.execute(cql)) { System.out.printf("User ID: %s Name: %s Age: %d\n", row.getUUID("id"), row.getString("name"), row.getInt("age")); } cluster.close(); } }
存储系统的设计与实现基础
设计分布式存储系统时,需要考虑的因素包括:
- 数据分布:如何将数据分布在各个节点上。
- 数据冗余:如何保证数据的可靠性,通常通过数据冗余来实现。
- 读写一致性:如何保证数据的一致性。
- 容错性:如何处理节点故障。
- 扩展性:如何支持系统规模的扩展。
分布式消息队列与缓存
消息队列的作用及应用场景
消息队列(MQ)用于在分布式系统中传递消息,它能够异步地处理数据,从而提高系统的响应速度和可靠性。消息队列的应用场景包括:
- 异步处理:将耗时的操作从主线程中分离出来,提高系统的响应速度。
- 解耦:将不同的服务解耦,使得服务之间可以独立开发和部署。
- 削峰填谷:在系统负载高峰期,通过消息队列积累请求,达到削峰填谷的效果。
- 可靠传输:保证消息的可靠传输。
缓存机制的重要性及其在分布式系统中的应用
缓存机制可以在分布式系统中提高性能和响应速度。常见的缓存机制包括:
- 内存缓存:如Redis、Memcached等,将数据存储在内存中,提供快速的访问。
- 分布式缓存:如Apache Ignite、Caffeine等,将缓存分布在多个节点上,提高系统的可用性和可靠性。
示例代码(使用Redis作为缓存)
import redis.clients.jedis.Jedis; public class RedisCacheExample { public static void main(String[] args) { Jedis jedis = new Jedis("localhost"); jedis.set("key", "value"); String value = jedis.get("key"); System.out.println(value); // 输出 "value" jedis.close(); } }
示例代码(使用Apache Ignite作为分布式缓存)
import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import java.util.Properties; public class IgniteExample { public static void main(String[] args) { IgniteConfiguration igniteConf = new IgniteConfiguration(); CacheConfiguration<String, String> cacheCfg = new CacheConfiguration<>(); cacheCfg.setCacheMode(CacheMode.PARTITIONED); cacheCfg.setName("myCache"); igniteConf.setCacheConfiguration(cacheCfg); igniteConf.setClientMode(true); Ignition.setClientMode(true); try (org.apache.ignite.Ignite ignite = Ignition.start(igniteConf)) { ignite.getOrCreateCache(cacheCfg).put("key", "value"); System.out.println(ignite.getOrCreateCache(cacheCfg).get("key")); } } }
常见消息队列与缓存系统(如Kafka、Redis)的简单使用
Kafka简介及其简单使用
Kafka是一种高吞吐量的消息队列系统,它广泛应用于日志聚合、流处理等场景。
示例代码(使用Java API访问Kafka)
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value"); producer.send(record); producer.close(); } }
示例代码(使用Java API消费Kafka消息)
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } }
以上是Java分布式学习入门教程的详细内容,涵盖了分布式系统的基本概念、Java网络编程、数据一致性、分布式计算框架、分布式存储系统以及消息队列与缓存的相关知识。通过这些概念和示例代码,希望读者能够对Java分布式系统有一个全面的理解。
这篇关于Java分布式学习入门教程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-25Seata初识学习入门:轻松掌握分布式事务管理
- 2024-09-25阿里云部署方案学习入门:新手必读指南
- 2024-09-25阿里云RDS学习入门指南
- 2024-09-25令牌锁功能学习入门:初学者指南
- 2024-09-25秒杀令牌初始化学习入门指南
- 2024-09-25JAVA分布式学习入门指南
- 2024-09-25Java分布式学习入门指南
- 2024-09-25Java微服务学习入门:从零开始的全面指南
- 2024-09-25数据结构和算法真题解析与实战演练
- 2024-09-25数据结构和算法考点详解与入门教程