Follow US:

Practice English Speaking&Listening with: [gRPC] Lecture 12 - P2: Upload file in chunks with client-streaming gRPC - Java

Difficulty: 0

Hello and welcome back to the gRPC course.

In this lecture, we will learn how to implement client-streaming RPC with Java

Were gonna implement an API

that allows clients to upload a laptop image file in multiple chunks.

OK lets start!

This is the pcbook-java project that weve been working on.

The first thing we need to do is to define the new upload image RPC.

As weve already done that in the previous video with Golang,

I will just open the pcbook golang project,

And copy-paste the content of the laptop_service.proto file.

Here we have the UploadImageRequest message.

It has a oneof data field,

which can either be image info, or a chunk of image data.

The ImageInfo contains the laptop ID and image type such as .jpg or .png

The chunk_data is a sequence of bytes.

The idea is that we will divide the image into multiple chunks of 1 kilobyte

And send them to the server sequentially via the stream.

Then the server will send back 1 single response,

Which contains the ID of the uploaded image

And the total size of that image.

So the UploadImage RPC will take a stream of UploadImageRequest as input

And return a UploadImageResponse.

Alright, lets build the project to generate Java codes.

The build is successful.

Now before we implement the RPC,

We will need to add a new store to save the uploaded image.

I will create a new ImageStore interface.

It has 1 function: Save

Which takes the laptopID, the imageType, and the imageData as input

And returns the imageID, or throws out an IOException.

Lets say we want to store the image on disk, and its metadata on memory.

So I will create a new DiskImageStore class to implement this interface.

In this class, we need a field to tell us where to store the images.

We also need a concurrent map to store the metadata of the images.

The key of the map is the image ID, and its value is the metadata.

I will create a new class for the ImageMetadata.

In this class, we will store the laptop ID,

The type of the image,

And the path to the image on disk.

Lets write a constructor to initialize the object.

And also create some getter functions for each of the fields.

OK, now go back to our DiskImageStore.

First we create a new constructor that takes only the imageFolder as input.

We initialize the data map with a new ConcurrentHashMap.

Then in the Save function,

We generate a random UUID that will be used as the image ID.

We make the path to store the image

by joining the imageFolder, imageID, and imageType together.

Then we create a new FileOutputStream with the image path.

We call imageData.writeTo() to write the image data to that file output stream.

And close the output stream.

Once the file is successfully written to disk,

We create a new metadata object,

And put it to the data map with the imageID key.

Finally, we return the imageID.

And were done with the DiskImageStore.

Now lets implement the uploadImage RPC in the LaptopService class.

First I will change this store field to laptopStore.

Then we will add a new field for the imageStore.

Also add it to this constructor.

OK, now we need to override the uploadImage() method.

As you can see, this method has a responseObserver parameter

That will be used to send the response to the client,

Just like the way it works in the searchLaptop RPC.

How about the stream of requests?

This is very different from server-streaming RPC,

Because its not an input parameter, but the return value of this function instead.

Here we can see that

the uploadImage function must return a StreamObserver of UploadImageRequest

And this StreamObserver is just an interface with 3 functions:

onNext, onError, and onCompleted.

What we need to do is to return an implementation of this interface.

So lets do that.

First we define 3 fields: laptopID, imageType, and imageData.

Now in the onNext() function,

We check the data case.

If it is image info,

We write a simple log saying that we have received the image info.

Then we get the laptopID and imageType from that info.

We also initialize the imageData as a new ByteArrayOutputStream.

And return.

Else, it must be a new data chunk.

So we get the chunk from the request.

Write a log here saying that weve received a chunk with this size.

Then we check if the imageData is null or not.

If it is null, it means that the client hasnt sent the image info

So we just send an error with INVALID_ARGUMENT status,

And return immediately.

Otherwise, we just call chunkData.writeTo() function

To add this chunk to the image data.

If we catch an exception,

just send an INTERNAL error to the client

and return.

Thats it for the onNext() function.

The onError() function is called

whenever an error occurs while the server is receiving stream data.

So here we just need to write a warning log.

OK, now lets implement the onCompleted() function.

When this function is called,

it means that the server has received all image chunk data.

So we just call imageStore.Save() to save the image data to the store.

Surround this call with a try-catch.

If an error is caught,

We call responseObserver.onError() to send it to the client.

We save the output imageID to a variable,

And also get the total image size.

Then we build a new UploadImageResponse object

With the imageID and imageSize.

We call responseObserver.onNext() to send the response to the client,

And finally call responseObserver.onCompleted() to finish it.

OK the uploadImage RPC is ready.

Now we need to update the LaptopServer a bit.

First change this store to laptopStore.

Add a new imageStore to this constructor.

Then pass it into this LaptopService.

Do the same for this constructor.

In the main function,

We also change the store variable to laptopStore.

And create a new DiskImageStore with the image folder isimg

Then pass it into the new LaptopServer constructor.

The img folder is already here,

So were all set.

Lets run the server.

Now lets try to call this server

using the Golang client that we wrote in the previous video.

The laptop image is successfully uploaded.

We can see it in the img folder.

So it works!


Now we will implement the Java client.

We cannot use the blockingStub to call the client-streaming RPC,

Instead, we will need an asynchronous stub.

So lets define it here.

And initialize it inside this constructor

By calling LaptopServiceGrpc.newStub().


Now define a uploadImage() function with 2 input parameters:

A laptop ID, and an image path.

In the main function,

Im gonna comment out this block of codes to test create and search laptop

That we wrote in the previous lectures.

Add add new codes to test upload image here.

First we generate a new random laptop.

We call client.createLaptop() to create this laptop on the server.

Then we call client.uploadImage() with the laptop ID

and a laptop.jpg file inside the tmp folder.

Lets create that tmp folder,

And copy the laptop.jpg file from the golang project to that folder.

Alright, its here.

Now in the uploadImage() function,

We call asyncStub.withDeadlineAfter 5 seconds

Then .uploadImage()

We create a new StreamObserver of UploadImageResponse here.

The output of this call will be another StreamObserver of UploadImageRequest.

In the onNext() function, we just write a simple log

Saying weve received this response from the server.

In the onError() function, we write a SEVERE log: upload failed.

Note that the stub is asynchronous,

Which means that the send request part and

the receive response part are run asynchronously.

Because of this, we need to use a CountDownLatch()

To wait until the whole process is completed.

Here we just use a count of 1 because we only need to wait for the response thread.

OK, now if an error occurs,

we will call countDown() inside the onError() function.

Similarly, in the onCompleted() function,

We also write a log,

And call finishLatch.countDown()

At the end of the uploadImage() function,

We call finishLatch.await() to wait for the response thread to finish.

Here we only wait for at most 1 minute,

Which is more than enough

Because above we set the deadline of the call to be 5 seconds.

Next we will create a new FileInputStream to read the image file.

If we catch an exception,

Just write a SEVERE log and return.

Else we get the image type from the image file extension.

We build a new image info with the laptop ID and image type.

We create a new UploadImageRequest with the image info.

And call requestObserver.onNext() to send the request to the server.

Surround this with a try-catch.

If theres an exception, we write a SEVERE log,

Call requestObserver.onError() to report it to the server.

And return.

Finally we call requestObserver.onCompleted()

Inside the try catch block,

After weve sent the image info,

We will start sending the image data in chunks.

Each chunk will be 1 kilobyte,

So we create a new byte buffer with the size of 1024.

We use a while loop here to read and send data multiple times.

I will need to pull this fileInputStream variable out.

Then here we can call to read more data into the buffer.

It will return the number of bytes read.

Assign it to n.

If n is less than or equal to 0, then its the end of file

We can safely break the loop.

Now we check if the latch has already finished because of some unexpected error,

Then we dont need to send more data, so just return.

Otherwise, we make a new request with the chunk data.

Here we just copy the first n bytes from the buffer.

Similar as before,

We call requestObserver.onNext() to send the request to the server.

And write a log saying that the chunk with this size was sent.

Thats it!

Were done with the client.

Now lets run the server.

And run the client.

The image is successfully uploaded.

And we got this response with image ID and image size.

The logs on the server side look good.

And we can see the laptop image inside the img folder.

Now lets say, we want to put a constraint on the maximum size of the image.

For example, only allow upload images with size of at most 1 kilobyte.

Then, in the onNext() function,

Before writing the chunk to the image data,

We compute the current size of the image.

If it is greater than the maximum allowed size,

Then we write a logimage is too large

We report the error to the client with INVALID_ARGUMENT status.

And return right away.

OK lets try it.

Run the server.

Then run the client.

As you can see, some chunks are sent to the server

And we got an INVALID_ARGUMENT error: image is too large.

So it works.

Note that the send part and receive part are parallel,

So its totally possible that the client will send more than 2 chunks

Before it receives the error from the server and stops sending more.

As a result, we will see a warning like this on the server side

Because the server has already closed the stream when it sent the error to the client.

OK the last thing before we finish,

When we receive the image info,

We need to check that the laptop ID exists in the store.

To do so, we just call laptopStore.Find(laptopID)

If the laptop is not found,

We simply call responseObserver.onError() with Status NOT_FOUND.

On the client side, we can comment out this command

So that the laptop is not created on the server.

Alright, now lets run the server.

And run the client.

We got the not found error.

So its working as expected.

And thats it for todays video about client-streaming RPC.

In the next lecture,

we will learn how to implement the last type of gRPC,

Which is bidirectional streaming.

I hope the course is useful for you so far.

Thank you for watching, and see you later!

The Description of [gRPC] Lecture 12 - P2: Upload file in chunks with client-streaming gRPC - Java