Hello and welcome back to the gRPC course.
In this lecture, we will learn how to implement client-streaming RPC with Java
We’re gonna implement an API
that allows clients to upload a laptop image file in multiple chunks.
OK let’s start!
This is the pcbook-java project that we’ve been working on.
The first thing we need to do is to define the new upload image RPC.
As we’ve already done that in the previous video with Golang,
I will just open the pcbook golang project,
And copy-paste the content of the laptop_service.proto file.
Here we have the UploadImageRequest message.
It has a oneof data field,
which can either be image info, or a chunk of image data.
The ImageInfo contains the laptop ID and image type such as .jpg or .png
The chunk_data is a sequence of bytes.
The idea is that we will divide the image into multiple chunks of 1 kilobyte
And send them to the server sequentially via the stream.
Then the server will send back 1 single response,
Which contains the ID of the uploaded image
And the total size of that image.
So the UploadImage RPC will take a stream of UploadImageRequest as input
And return a UploadImageResponse.
Alright, let’s build the project to generate Java codes.
The build is successful.
Now before we implement the RPC,
We will need to add a new store to save the uploaded image.
I will create a new ImageStore interface.
It has 1 function: Save
Which takes the laptopID, the imageType, and the imageData as input
And returns the imageID, or throws out an IOException.
Let’s say we want to store the image on disk, and its metadata on memory.
So I will create a new DiskImageStore class to implement this interface.
In this class, we need a field to tell us where to store the images.
We also need a concurrent map to store the metadata of the images.
The key of the map is the image ID, and its value is the metadata.
I will create a new class for the ImageMetadata.
In this class, we will store the laptop ID,
The type of the image,
And the path to the image on disk.
Let’s write a constructor to initialize the object.
And also create some getter functions for each of the fields.
OK, now go back to our DiskImageStore.
First we create a new constructor that takes only the imageFolder as input.
We initialize the data map with a new ConcurrentHashMap.
Then in the Save function,
We generate a random UUID that will be used as the image ID.
We make the path to store the image
by joining the imageFolder, imageID, and imageType together.
Then we create a new FileOutputStream with the image path.
We call imageData.writeTo() to write the image data to that file output stream.
And close the output stream.
Once the file is successfully written to disk,
We create a new metadata object,
And put it to the data map with the imageID key.
Finally, we return the imageID.
And we’re done with the DiskImageStore.
Now let’s implement the uploadImage RPC in the LaptopService class.
First I will change this store field to laptopStore.
Then we will add a new field for the imageStore.
Also add it to this constructor.
OK, now we need to override the uploadImage() method.
As you can see, this method has a responseObserver parameter
That will be used to send the response to the client,
Just like the way it works in the searchLaptop RPC.
How about the stream of requests?
This is very different from server-streaming RPC,
Because it’s not an input parameter, but the return value of this function instead.
Here we can see that
the uploadImage function must return a StreamObserver of UploadImageRequest
And this StreamObserver is just an interface with 3 functions:
onNext, onError, and onCompleted.
What we need to do is to return an implementation of this interface.
So let’s do that.
First we define 3 fields: laptopID, imageType, and imageData.
Now in the onNext() function,
We check the data case.
If it is image info,
We write a simple log saying that we have received the image info.
Then we get the laptopID and imageType from that info.
We also initialize the imageData as a new ByteArrayOutputStream.
Else, it must be a new data chunk.
So we get the chunk from the request.
Write a log here saying that we’ve received a chunk with this size.
Then we check if the imageData is null or not.
If it is null, it means that the client hasn’t sent the image info
So we just send an error with INVALID_ARGUMENT status,
And return immediately.
Otherwise, we just call chunkData.writeTo() function
To add this chunk to the image data.
If we catch an exception,
just send an INTERNAL error to the client
That’s it for the onNext() function.
The onError() function is called
whenever an error occurs while the server is receiving stream data.
So here we just need to write a warning log.
OK, now let’s implement the onCompleted() function.
When this function is called,
it means that the server has received all image chunk data.
So we just call imageStore.Save() to save the image data to the store.
Surround this call with a try-catch.
If an error is caught,
We call responseObserver.onError() to send it to the client.
We save the output imageID to a variable,
And also get the total image size.
Then we build a new UploadImageResponse object
With the imageID and imageSize.
We call responseObserver.onNext() to send the response to the client,
And finally call responseObserver.onCompleted() to finish it.
OK the uploadImage RPC is ready.
Now we need to update the LaptopServer a bit.
First change this store to laptopStore.
Add a new imageStore to this constructor.
Then pass it into this LaptopService.
Do the same for this constructor.
In the main function,
We also change the store variable to laptopStore.
And create a new DiskImageStore with the image folder is “img”
Then pass it into the new LaptopServer constructor.
The img folder is already here,
So we’re all set.
Let’s run the server.
Now let’s try to call this server
using the Golang client that we wrote in the previous video.
The laptop image is successfully uploaded.
We can see it in the img folder.
So it works!
Now we will implement the Java client.
We cannot use the blockingStub to call the client-streaming RPC,
Instead, we will need an asynchronous stub.
So let’s define it here.
And initialize it inside this constructor
By calling LaptopServiceGrpc.newStub().
Now define a uploadImage() function with 2 input parameters:
A laptop ID, and an image path.
In the main function,
I’m gonna comment out this block of codes to test create and search laptop
That we wrote in the previous lectures.
Add add new codes to test upload image here.
First we generate a new random laptop.
We call client.createLaptop() to create this laptop on the server.
Then we call client.uploadImage() with the laptop ID
and a laptop.jpg file inside the tmp folder.
Let’s create that tmp folder,
And copy the laptop.jpg file from the golang project to that folder.
Alright, it’s here.
Now in the uploadImage() function,
We call asyncStub.withDeadlineAfter 5 seconds
We create a new StreamObserver of UploadImageResponse here.
The output of this call will be another StreamObserver of UploadImageRequest.
In the onNext() function, we just write a simple log
Saying we’ve received this response from the server.
In the onError() function, we write a SEVERE log: upload failed.
Note that the stub is asynchronous,
Which means that the send request part and
the receive response part are run asynchronously.
Because of this, we need to use a CountDownLatch()
To wait until the whole process is completed.
Here we just use a count of 1 because we only need to wait for the response thread.
OK, now if an error occurs,
we will call countDown() inside the onError() function.
Similarly, in the onCompleted() function,
We also write a log,
And call finishLatch.countDown()
At the end of the uploadImage() function,
We call finishLatch.await() to wait for the response thread to finish.
Here we only wait for at most 1 minute,
Which is more than enough
Because above we set the deadline of the call to be 5 seconds.
Next we will create a new FileInputStream to read the image file.
If we catch an exception,
Just write a SEVERE log and return.
Else we get the image type from the image file extension.
We build a new image info with the laptop ID and image type.
We create a new UploadImageRequest with the image info.
And call requestObserver.onNext() to send the request to the server.
Surround this with a try-catch.
If there’s an exception, we write a SEVERE log,
Call requestObserver.onError() to report it to the server.
Finally we call requestObserver.onCompleted()
Inside the try catch block,
After we’ve sent the image info,
We will start sending the image data in chunks.
Each chunk will be 1 kilobyte,
So we create a new byte buffer with the size of 1024.
We use a while loop here to read and send data multiple times.
I will need to pull this fileInputStream variable out.
Then here we can call fileInputStream.read() to read more data into the buffer.
It will return the number of bytes read.
Assign it to n.
If n is less than or equal to 0, then it’s the end of file
We can safely break the loop.
Now we check if the latch has already finished because of some unexpected error,
Then we don’t need to send more data, so just return.
Otherwise, we make a new request with the chunk data.
Here we just copy the first n bytes from the buffer.
Similar as before,
We call requestObserver.onNext() to send the request to the server.
And write a log saying that the chunk with this size was sent.
We’re done with the client.
Now let’s run the server.
And run the client.
The image is successfully uploaded.
And we got this response with image ID and image size.
The logs on the server side look good.
And we can see the laptop image inside the img folder.
Now let’s say, we want to put a constraint on the maximum size of the image.
For example, only allow upload images with size of at most 1 kilobyte.
Then, in the onNext() function,
Before writing the chunk to the image data,
We compute the current size of the image.
If it is greater than the maximum allowed size,
Then we write a log “image is too large”
We report the error to the client with INVALID_ARGUMENT status.
And return right away.
OK let’s try it.
Run the server.
Then run the client.
As you can see, some chunks are sent to the server
And we got an INVALID_ARGUMENT error: image is too large.
So it works.
Note that the send part and receive part are parallel,
So it’s totally possible that the client will send more than 2 chunks
Before it receives the error from the server and stops sending more.
As a result, we will see a warning like this on the server side
Because the server has already closed the stream when it sent the error to the client.
OK the last thing before we finish,
When we receive the image info,
We need to check that the laptop ID exists in the store.
To do so, we just call laptopStore.Find(laptopID)
If the laptop is not found,
We simply call responseObserver.onError() with Status NOT_FOUND.
On the client side, we can comment out this command
So that the laptop is not created on the server.
Alright, now let’s run the server.
And run the client.
We got the not found error.
So it’s working as expected.
And that’s it for today’s video about client-streaming RPC.
In the next lecture,
we will learn how to implement the last type of gRPC,
Which is bidirectional streaming.
I hope the course is useful for you so far.
Thank you for watching, and see you later!